/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.impl.outbox.persistence;

import com.sap.cds.impl.parser.token.Jsonizer;
import com.sap.cds.ql.Insert;
import com.sap.cds.ql.cqn.CqnInsert;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.changeset.ChangeSetContext;
import com.sap.cds.services.changeset.ChangeSetListener;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.impl.outbox.AbstractOutboxService;
import com.sap.cds.services.impl.outbox.Messages;
import com.sap.cds.services.impl.outbox.Messages_;
import com.sap.cds.services.impl.outbox.persistence.OutboxStatistics;
import com.sap.cds.services.impl.outbox.persistence.TelemetryData;
import com.sap.cds.services.impl.outbox.persistence.TelemetryDataImpl;
import com.sap.cds.services.impl.outbox.persistence.collectors.PartitionCollector;
import com.sap.cds.services.impl.utils.CdsServiceUtils;
import com.sap.cds.services.outbox.OutboxMessageEventContext;
import com.sap.cds.services.persistence.PersistenceService;
import com.sap.cds.services.runtime.CdsRuntime;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentOutbox
extends AbstractOutboxService {
    private static final Logger LOG = LoggerFactory.getLogger(PersistentOutbox.class);
    public static final String ATTR_EVENT = "event";
    public static final String ATTR_MESSAGE = "message";
    private final TelemetryData telemetryData;
    private final PartitionCollector collector;
    private final CdsProperties.Outbox.OutboxServiceConfig config;
    private Map<ChangeSetContext, Long> changeSetContextCache = new ConcurrentHashMap<ChangeSetContext, Long>();

    public PersistentOutbox(String name, CdsProperties.Outbox.OutboxServiceConfig config, CdsRuntime runtime, Supplier<List<String>> tenantSupplier) {
        super(name, runtime);
        this.config = config;
        this.telemetryData = config.isObservable() ? new TelemetryDataImpl(name, config.getMaxAttempts()) : TelemetryData.NOOP;
        this.collector = new PartitionCollector(runtime, this, config, tenantSupplier, this.telemetryData);
    }

    Collection<OutboxStatistics> getStatistics() {
        return this.telemetryData.getStatistics();
    }

    void init() {
        if (this.config.isStartCollector()) {
            this.start();
        }
    }

    public void start() {
        this.collector.start();
    }

    public void stop() {
        try {
            this.stop(0L);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void stop(long millis) throws InterruptedException {
        this.collector.stop(millis);
    }

    public boolean isCollectorRunning() {
        return this.collector.isRunning();
    }

    @Override
    protected void submit(OutboxMessageEventContext context) {
        LOG.debug("Submitting outbox message for target '{}' with event '{}'.", (Object)this.getName(), (Object)context.getEvent());
        this.persist(context);
        LOG.debug("Stored outbox message for target '{}' with event '{}'.", (Object)this.getName(), (Object)context.getEvent());
        final ChangeSetContext changeSetContext = context.getChangeSetContext();
        this.changeSetContextCache.computeIfPresent(changeSetContext, (k, val) -> val + 1L);
        if (!this.changeSetContextCache.containsKey(changeSetContext)) {
            this.changeSetContextCache.put(changeSetContext, 1L);
            final String tenant = context.getUserInfo().getTenant();
            changeSetContext.register(new ChangeSetListener(){

                public void afterClose(boolean completed) {
                    long incomingCount = PersistentOutbox.this.changeSetContextCache.remove(changeSetContext);
                    if (completed) {
                        PersistentOutbox.this.telemetryData.recordIncomingMessages(tenant, incomingCount);
                        if (PersistentOutbox.this.config.getTriggerSchedule().isEnabled().booleanValue()) {
                            PersistentOutbox.this.collector.unpause();
                        }
                    }
                }
            });
        }
    }

    private void persist(OutboxMessageEventContext context) {
        Messages message = Messages.create();
        HashMap<String, Object> data = new HashMap<String, Object>();
        data.put(ATTR_MESSAGE, context.getMessage());
        data.put(ATTR_EVENT, context.getEvent());
        message.setMsg(Jsonizer.json(data));
        message.setTarget(this.getName());
        message.setTimestamp(context.getTimestamp());
        PersistenceService db = CdsServiceUtils.getDefaultPersistenceService((EventContext)context);
        Insert insert = Insert.into(Messages_.class).entry((Map)((Object)message));
        db.run((CqnInsert)insert);
    }
}

