/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.impl.outbox.persistence;

import com.sap.cds.ql.Insert;
import com.sap.cds.ql.cqn.CqnInsert;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.changeset.ChangeSetContext;
import com.sap.cds.services.changeset.ChangeSetListener;
import com.sap.cds.services.impl.outbox.Messages;
import com.sap.cds.services.impl.outbox.Messages_;
import com.sap.cds.services.impl.outbox.persistence.collectors.PartitionCollectorCoordinator;
import com.sap.cds.services.impl.utils.CdsServiceUtils;
import com.sap.cds.services.outbox.OutboxMessageEventContext;
import com.sap.cds.services.persistence.PersistenceService;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.outbox.AbstractOutboxService;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentOutbox
extends AbstractOutboxService {
    private static final Logger LOG = LoggerFactory.getLogger(PersistentOutbox.class);
    private final CdsRuntime runtime;
    private PartitionCollectorCoordinator collector;
    private Map<ChangeSetContext, Set<Integer>> changeSetCache = new ConcurrentHashMap<ChangeSetContext, Set<Integer>>();

    public PersistentOutbox(String name, CdsRuntime runtime) {
        super(name);
        this.runtime = runtime;
    }

    void init() {
        if (this.collector == null) {
            this.collector = new PartitionCollectorCoordinator(this.runtime);
        }
    }

    void stop() {
        if (this.collector != null) {
            this.collector.stop();
        }
    }

    protected void enroll(OutboxMessageEventContext context) {
        LOG.debug("Enrolling outbox message with target event '{}'.", (Object)context.getEvent());
        int partition = this.persist(context);
        LOG.debug("Stored outbox message with target event '{}' in partition '{}'.", (Object)context.getEvent(), (Object)partition);
        if (context.getCdsRuntime().getEnvironment().getCdsProperties().getOutbox().getPersistent().getTriggerSchedule().isEnabled().booleanValue() && !this.changeSetCache.getOrDefault(context.getChangeSetContext(), Collections.emptySet()).contains(partition)) {
            final ChangeSetContext changeSetContext = context.getChangeSetContext();
            this.changeSetCache.compute(context.getChangeSetContext(), (k, v) -> {
                if (v == null) {
                    v = new HashSet<Integer>();
                }
                v.add(partition);
                return v;
            });
            context.getChangeSetContext().register(new ChangeSetListener(){

                public void afterClose(boolean completed) {
                    Set<Integer> partitions = PersistentOutbox.this.changeSetCache.remove(changeSetContext);
                    if (completed && PersistentOutbox.this.collector != null) {
                        partitions.forEach(PersistentOutbox.this.collector::schedule);
                    }
                }
            });
        }
    }

    private int persist(OutboxMessageEventContext context) {
        Messages message = Messages.create();
        message.setMsg(context.getMessage());
        String target = context.getEvent();
        message.setTarget(target);
        message.setTimestamp(context.getTimestamp());
        int partition = target.equals("auditlog/AuditlogService$Default") ? 1 : (target.startsWith("messaging/") ? 0 : ThreadLocalRandom.current().nextInt(2));
        message.setPartition(partition);
        PersistenceService db = CdsServiceUtils.getDefaultPersistenceService((EventContext)context);
        Insert insert = Insert.into(Messages_.class).entry((Map)((Object)message));
        db.run((CqnInsert)insert);
        return partition;
    }
}

