/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.impl.outbox.persistence;

import com.sap.cds.impl.parser.token.Jsonizer;
import com.sap.cds.ql.Insert;
import com.sap.cds.ql.cqn.CqnInsert;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.changeset.ChangeSetContext;
import com.sap.cds.services.changeset.ChangeSetListener;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.impl.outbox.AbstractOutboxService;
import com.sap.cds.services.impl.outbox.Messages;
import com.sap.cds.services.impl.outbox.Messages_;
import com.sap.cds.services.impl.outbox.persistence.collectors.PartitionCollector;
import com.sap.cds.services.impl.utils.CdsServiceUtils;
import com.sap.cds.services.mt.TenantInfo;
import com.sap.cds.services.outbox.OutboxMessageEventContext;
import com.sap.cds.services.persistence.PersistenceService;
import com.sap.cds.services.runtime.CdsRuntime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentOutbox
extends AbstractOutboxService {
    private static final Logger LOG = LoggerFactory.getLogger(PersistentOutbox.class);
    public static final String ATTR_EVENT = "event";
    public static final String ATTR_MESSAGE = "message";
    private Thread job;
    private final PartitionCollector collector;
    private final CdsProperties.Outbox.OutboxConfig config;
    private Set<ChangeSetContext> changeSetContextCache = ConcurrentHashMap.newKeySet();

    public PersistentOutbox(String name, CdsProperties.Outbox.OutboxConfig config, CdsRuntime runtime, Supplier<List<TenantInfo>> tenantSupplier) {
        super(name, runtime);
        this.config = config;
        this.collector = new PartitionCollector(runtime, this, config, tenantSupplier);
    }

    void init() {
        if (this.job == null) {
            LOG.debug("Initializing collector of outbox '{}'", (Object)this.getName());
            this.job = new Thread((Runnable)this.collector, this.getName() + "-collector");
            this.job.setDaemon(true);
            this.job.start();
        }
    }

    void stop() {
        if (this.job != null) {
            LOG.debug("Stopping collector of the outbox '{}'", (Object)this.getName());
            this.job.interrupt();
        }
    }

    @Override
    protected void submit(OutboxMessageEventContext context) {
        LOG.debug("Submitting outbox message for target '{}' with event '{}'.", (Object)this.getName(), (Object)context.getEvent());
        this.persist(context);
        LOG.debug("Stored outbox message for target '{}' with event '{}'.", (Object)this.getName(), (Object)context.getEvent());
        final ChangeSetContext changeSetContext = context.getChangeSetContext();
        if (this.config.getTriggerSchedule().isEnabled().booleanValue() && !this.changeSetContextCache.contains(changeSetContext)) {
            this.changeSetContextCache.add(changeSetContext);
            changeSetContext.register(new ChangeSetListener(){

                public void afterClose(boolean completed) {
                    PersistentOutbox.this.changeSetContextCache.remove(changeSetContext);
                    if (completed) {
                        PersistentOutbox.this.collector.unpause();
                    }
                }
            });
        }
    }

    private void persist(OutboxMessageEventContext context) {
        Messages message = Messages.create();
        HashMap<String, Object> data = new HashMap<String, Object>();
        data.put(ATTR_MESSAGE, context.getMessage());
        data.put(ATTR_EVENT, context.getEvent());
        message.setMsg(Jsonizer.json(data));
        message.setTarget(this.getName());
        message.setTimestamp(context.getTimestamp());
        PersistenceService db = CdsServiceUtils.getDefaultPersistenceService((EventContext)context);
        Insert insert = Insert.into(Messages_.class).entry((Map)((Object)message));
        db.run((CqnInsert)insert);
    }
}

