/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.impl.outbox.persistence.collectors;

import com.sap.cds.CdsLockTimeoutException;
import com.sap.cds.Result;
import com.sap.cds.ql.Delete;
import com.sap.cds.ql.Select;
import com.sap.cds.ql.Update;
import com.sap.cds.ql.cqn.CqnDelete;
import com.sap.cds.ql.cqn.CqnPredicate;
import com.sap.cds.ql.cqn.CqnSelect;
import com.sap.cds.ql.cqn.CqnUpdate;
import com.sap.cds.reflect.CdsModel;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.impl.outbox.Messages;
import com.sap.cds.services.impl.outbox.Messages_;
import com.sap.cds.services.impl.outbox.persistence.PersistentOutbox;
import com.sap.cds.services.mt.TenantInfo;
import com.sap.cds.services.outbox.OutboxMessageEventContext;
import com.sap.cds.services.outbox.OutboxService;
import com.sap.cds.services.persistence.PersistenceService;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.outbox.OutboxUtils;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionCollector
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionCollector.class);
    private final CdsRuntime runtime;
    private final PersistenceService db;
    private final OutboxService outboxService;
    private final int chunkSize;
    private final int partition;
    private final Object pauseMonitor = new Object();
    private final AtomicInteger pauseCount = new AtomicInteger(5);
    private volatile boolean pause = false;
    private final long maxPauseMillis;
    private final long emitTimeoutSeconds;
    private final int maxPublishAttempts;
    private final Supplier<List<TenantInfo>> tenantSupplier;

    public PartitionCollector(CdsRuntime runtime, PersistentOutbox outboxService, Supplier<List<TenantInfo>> tenantSupplier, int partition) {
        this.runtime = runtime;
        this.db = (PersistenceService)runtime.getServiceCatalog().getService(PersistenceService.class, "PersistenceService$Default");
        this.outboxService = outboxService;
        this.partition = partition;
        CdsProperties.Outbox.Persistent persistent = runtime.getEnvironment().getCdsProperties().getOutbox().getPersistent();
        this.chunkSize = persistent.getChunkSize();
        this.maxPauseMillis = persistent.getMaxPause().getSeconds() * 1000L;
        this.emitTimeoutSeconds = persistent.getEmitTimeout().getSeconds();
        this.maxPublishAttempts = persistent.getMaxAttempts();
        this.tenantSupplier = tenantSupplier;
    }

    @Override
    public void run() {
        this.processPartition();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pause() {
        Object object = this.pauseMonitor;
        synchronized (object) {
            this.pause = true;
            try {
                long pauseInMillis = PartitionCollector.getPauseMillis(this.pauseCount.get(), this.maxPauseMillis);
                LOG.debug("Pausing partition collector {} for {} ms", (Object)this.partition, (Object)pauseInMillis);
                this.pauseMonitor.wait(pauseInMillis);
            }
            catch (InterruptedException e) {
                LOG.warn("Partition collector thread '{}' interrupted", (Object)Thread.currentThread().getName(), (Object)e);
                Thread.currentThread().interrupt();
            }
            this.pause = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unpause() {
        this.pauseCount.set(0);
        if (this.pause) {
            Object object = this.pauseMonitor;
            synchronized (object) {
                if (this.pause) {
                    this.pause = false;
                    this.pauseMonitor.notifyAll();
                    LOG.debug("Notified paused partition collector {}", (Object)this.partition);
                }
            }
        }
    }

    private void processPartition() {
        while (true) {
            try {
                while (true) {
                    LOG.debug("Executing partition collector {}", (Object)this.partition);
                    AtomicBoolean doPause = new AtomicBoolean(true);
                    for (TenantInfo tenantInfo : this.tenantSupplier.get()) {
                        try {
                            LOG.debug("Processing tenant '{}' in partition collector {}", (Object)tenantInfo.getTenant(), (Object)this.partition);
                            this.runtime.requestContext().clearUser().modifyUser(user -> user.setTenant(tenantInfo.getTenant())).run(req -> {
                                if (OutboxUtils.hasOutboxModel((CdsModel)req.getModel())) {
                                    this.runtime.changeSetContext().run(ctx -> {
                                        Select select = Select.from(Messages_.class).where(e -> e.partition().eq((Object)this.partition).and((CqnPredicate)e.attempts().lt((Object)this.maxPublishAttempts), new CqnPredicate[0])).orderBy(new Function[]{e -> e.timestamp().asc()}).limit((long)this.chunkSize).lock(0);
                                        Result res = this.db.run((CqnSelect)select, new Object[0]);
                                        if (res.rowCount() >= select.top()) {
                                            doPause.set(false);
                                        }
                                        if (res.rowCount() > 0L) {
                                            Instant startOfDispatch = Instant.now();
                                            for (Messages msg : res.listOf(Messages.class)) {
                                                if (this.publish(msg)) {
                                                    this.db.run((CqnDelete)Delete.from(Messages_.class).where(e -> e.ID().eq((Object)msg.getId())), new Object[0]);
                                                }
                                                if (Duration.between(startOfDispatch, Instant.now()).getSeconds() <= this.emitTimeoutSeconds) continue;
                                                break;
                                            }
                                        }
                                    });
                                } else {
                                    LOG.debug("The outbox model is not available for the tenant '{}'", (Object)tenantInfo.getTenant());
                                }
                            });
                        }
                        catch (Exception e) {
                            if (PartitionCollector.isLockTimeoutException(e)) {
                                LOG.debug("Partition collector {} timed out waiting for table lock for tenant '{}'", (Object)this.partition, (Object)tenantInfo.getTenant());
                                doPause.set(true);
                                break;
                            }
                            LOG.warn("Exception occurred for tenant '{}' in partition collector {}", new Object[]{tenantInfo.getTenant(), this.partition, e});
                        }
                    }
                    if (doPause.get()) {
                        this.pause();
                        if (this.pauseCount.get() >= 20) continue;
                        this.pauseCount.addAndGet(2);
                        continue;
                    }
                    this.pauseCount.set(0);
                }
            }
            catch (Exception e) {
                LOG.warn("Unexpected exception occured in partition collector {}", (Object)this.partition, (Object)e);
                continue;
            }
            break;
        }
    }

    private boolean publish(Messages msg) {
        LOG.debug("Publishing outbox message with target event '{}'", (Object)msg.getTarget());
        OutboxMessageEventContext ctx = OutboxMessageEventContext.create((String)msg.getTarget());
        ctx.setIsInbound(true);
        ctx.setTimestamp(msg.getTimestamp());
        ctx.setMessage(msg.getMsg());
        while (true) {
            try {
                this.outboxService.emit((EventContext)ctx);
                return true;
            }
            catch (Exception e) {
                LOG.warn("Failed to emit Outbox message with id '{}' and target '{}'", new Object[]{msg.getId(), msg.getTarget(), e});
                int currentAttempts = msg.getAttempts();
                if (currentAttempts < this.maxPublishAttempts) {
                    msg.setAttempts(++currentAttempts);
                    this.db.run((CqnUpdate)Update.entity(Messages_.class).data("attempts", (Object)currentAttempts).where(m -> m.ID().eq((Object)msg.getId())), new Object[0]);
                    try {
                        long pauseInMillis = PartitionCollector.getPauseMillis(currentAttempts, Integer.MAX_VALUE);
                        TimeUnit.MILLISECONDS.sleep(pauseInMillis);
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                    continue;
                }
                LOG.warn("Reached maximum number of attempts to emit Outbox message with id '{}' and target '{}'", (Object)msg.getId(), (Object)msg.getTarget());
                return false;
            }
            break;
        }
    }

    private static long getPauseMillis(int pauseCount, long maxTimeoutMillis) {
        long retryInMillis = Math.round(Math.pow(2.0, pauseCount) * 1000.0 + (double)RandomUtils.nextLong((long)0L, (long)1001L));
        return Math.min(retryInMillis, maxTimeoutMillis);
    }

    private static boolean isLockTimeoutException(Throwable t) {
        while (t != null) {
            if (t instanceof CdsLockTimeoutException) {
                return true;
            }
            t = t.getCause();
        }
        return false;
    }
}

