/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.impl.outbox.persistence.collectors;

import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.core.util.VersionUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.sap.cds.CdsLockTimeoutException;
import com.sap.cds.Result;
import com.sap.cds.Struct;
import com.sap.cds.impl.parser.JsonParser;
import com.sap.cds.ql.CQL;
import com.sap.cds.ql.Delete;
import com.sap.cds.ql.Predicate;
import com.sap.cds.ql.Select;
import com.sap.cds.ql.Update;
import com.sap.cds.ql.cqn.CqnDelete;
import com.sap.cds.ql.cqn.CqnPredicate;
import com.sap.cds.ql.cqn.CqnSelect;
import com.sap.cds.ql.cqn.CqnUpdate;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.impl.model.DynamicModelProvider;
import com.sap.cds.services.impl.outbox.Messages;
import com.sap.cds.services.impl.outbox.Messages_;
import com.sap.cds.services.impl.outbox.persistence.PersistentOutbox;
import com.sap.cds.services.impl.outbox.persistence.TelemetryData;
import com.sap.cds.services.impl.outbox.persistence.collectors.OutboxCollector;
import com.sap.cds.services.outbox.OutboxMessage;
import com.sap.cds.services.outbox.OutboxMessageEventContext;
import com.sap.cds.services.persistence.PersistenceService;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.runtime.RequestContextRunner;
import com.sap.cds.services.utils.OpenTelemetryUtils;
import com.sap.cds.services.utils.lib.mt.TenantUtils;
import com.sap.cds.services.utils.outbox.OutboxUtils;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.opentelemetry.context.Scope;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionCollector
implements Runnable,
OutboxCollector {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionCollector.class);
    private PersistenceService db;
    private final CdsRuntime runtime;
    private final PersistentOutbox outboxService;
    private final TelemetryData telemetryData;
    private final int chunkSize;
    private final String target;
    private volatile boolean shutdown;
    private volatile boolean isRunning;
    private Thread job;
    private final Object pauseMonitor = new Object();
    private final AtomicInteger pauseCount = new AtomicInteger(5);
    private volatile boolean pause = false;
    private final long maxPauseMillis;
    private final long emitTimeoutSeconds;
    private final int maxPublishAttempts;
    private final boolean storeLastError;
    private final boolean ordered;
    private final Supplier<List<String>> tenantSupplier;
    private final boolean checkVersion;
    private final String appVersion;
    private final Version appVersionParsed;
    private final Set<String> suspendedTenants = new HashSet<String>();

    public PartitionCollector(CdsRuntime runtime, PersistentOutbox outboxService, CdsProperties.Outbox.OutboxServiceConfig config, String appVersion, Supplier<List<String>> tenantSupplier, TelemetryData telemetryData) {
        this.runtime = runtime;
        this.outboxService = outboxService;
        this.telemetryData = telemetryData;
        this.target = outboxService.getName();
        this.chunkSize = config.getChunkSize();
        this.maxPauseMillis = config.getMaxPause().getSeconds() * 1000L;
        this.emitTimeoutSeconds = config.getEmitTimeout().getSeconds();
        this.maxPublishAttempts = config.getMaxAttempts();
        this.storeLastError = config.getStoreLastError().isEnabled();
        this.ordered = config.isOrdered();
        this.tenantSupplier = tenantSupplier;
        this.checkVersion = config.isCheckVersion();
        this.appVersion = appVersion;
        Version version = this.appVersionParsed = this.checkVersion ? VersionUtil.parseVersion((String)appVersion, null, null) : null;
        if (!this.storeLastError) {
            LOG.debug("Storing errors for outbox '{}' is disabled.", (Object)outboxService.getName());
        }
    }

    @Override
    public void run() {
        this.shutdown = false;
        this.isRunning = true;
        try {
            this.db = (PersistenceService)this.runtime.getServiceCatalog().getService(PersistenceService.class, "PersistenceService$Default");
            this.processPartition();
        }
        finally {
            this.isRunning = false;
            this.shutdown = false;
        }
    }

    @Override
    public void start() {
        if (!this.isRunning) {
            LOG.debug("Starting collector of the outbox '{}'", (Object)this.outboxService.getName());
            this.job = new Thread((Runnable)this, this.outboxService.getName() + "-collector");
            this.job.setDaemon(true);
            this.job.start();
        }
    }

    @Override
    public void stop(long waitMillis) throws InterruptedException {
        if (!this.shutdown && this.isRunning) {
            LOG.debug("Stopping collector of the outbox '{}'", (Object)this.outboxService.getName());
            this.shutdown = true;
            this.unpause();
            this.job.join(waitMillis);
            this.job = null;
        }
    }

    @Override
    public boolean isRunning() {
        return this.isRunning;
    }

    private boolean isShutdown() {
        return this.shutdown || Thread.currentThread().isInterrupted();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pause(long pauseInMillis) {
        Object object = this.pauseMonitor;
        synchronized (object) {
            if (this.isShutdown()) {
                return;
            }
            this.pause = true;
            try {
                LOG.debug("Pausing collector '{}' for {} ms", (Object)this.target, (Object)pauseInMillis);
                this.pauseMonitor.wait(pauseInMillis);
            }
            catch (InterruptedException e) {
                LOG.debug("Collector '{}' interrupted", (Object)this.target);
                Thread.currentThread().interrupt();
            }
            this.pause = false;
        }
    }

    @Override
    public void schedule(String tenant, long delay, boolean withEmptyCheck) {
        this.unpause();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unpause() {
        this.pauseCount.set(0);
        if (this.pause) {
            Object object = this.pauseMonitor;
            synchronized (object) {
                if (this.pause) {
                    this.pause = false;
                    this.pauseMonitor.notifyAll();
                    LOG.debug("Notified paused collector '{}'", (Object)this.target);
                }
            }
        }
    }

    private boolean isNotEmptyOutbox(String tenant) {
        LOG.debug("Checking tenant '{}' for outbox entries in collector '{}'", (Object)tenant, (Object)this.target);
        return (Boolean)this.runtime.requestContext().featureToggles(DynamicModelProvider.STATIC_MODEL_ACCESS_FEATURE).systemUser(tenant).run(req -> !this.outboxService.isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPartition() {
        while (!this.isShutdown()) {
            try {
                LOG.debug("Executing collector '{}'", (Object)this.target);
                AtomicBoolean doPause = new AtomicBoolean(true);
                ArrayList tenants = new ArrayList(this.tenantSupplier.get());
                tenants.removeAll(this.suspendedTenants);
                Collections.shuffle(tenants);
                for (String tenant : tenants) {
                    Optional span = OpenTelemetryUtils.createSpan((OpenTelemetryUtils.CdsSpanType)OpenTelemetryUtils.CdsSpanType.OUTBOX, (SpanKind)SpanKind.SERVER);
                    try {
                        Scope scope = span.map(ImplicitContextKeyed::makeCurrent).orElse(null);
                        try {
                            span.ifPresent(s -> {
                                s.updateName("Outbox Collector (" + this.target + ")");
                                s.setAttribute(OpenTelemetryUtils.CDS_TENANT, (Object)tenant);
                                s.setAttribute(OpenTelemetryUtils.CDS_OUTBOX_TARGET, (Object)this.target);
                            });
                            if (this.isNotEmptyOutbox(tenant)) {
                                LOG.debug("Processing tenant '{}' in collector '{}'", (Object)tenant, (Object)this.target);
                                boolean interrupted = (Boolean)this.runtime.requestContext().systemUser(tenant).run(req -> (Boolean)this.runtime.changeSetContext().run(ctx -> {
                                    Predicate where = CQL.get((String)"target").eq((Object)this.target);
                                    if ("DefaultOutboxUnordered".equals(this.target)) {
                                        where = CQL.or((CqnPredicate)where, (CqnPredicate)CQL.get((String)"target").startsWith("auditlog/"));
                                    } else if ("DefaultOutboxOrdered".equals(this.target)) {
                                        where = CQL.or((CqnPredicate)where, (CqnPredicate)CQL.get((String)"target").startsWith("messaging/"));
                                    }
                                    where = CQL.and((CqnPredicate)where, (CqnPredicate)CQL.get((String)"attempts").lt((Object)this.maxPublishAttempts));
                                    long skip = this.calculateOffset(where);
                                    Select select = Select.from(Messages_.class).where((CqnPredicate)where).orderBy(new Function[]{e -> e.timestamp().asc(), e -> e.ID().asc()}).limit((long)this.chunkSize, skip).lock(0);
                                    Result res = this.db.run((CqnSelect)select, new Object[0]);
                                    if (skip != 0L || res.rowCount() >= select.top()) {
                                        doPause.set(false);
                                    }
                                    if (res.rowCount() > 0L) {
                                        Instant startOfDispatch = Instant.now();
                                        for (Messages msg : res.listOf(Messages.class)) {
                                            PublishState state = this.publish(msg, startOfDispatch);
                                            if (state == PublishState.SUCCESS) {
                                                this.db.run((CqnDelete)Delete.from(Messages_.class).where(e -> e.ID().eq((Object)msg.getId())), new Object[0]);
                                                this.telemetryData.recordOutgoingMessages(tenant, 1L);
                                            } else {
                                                if (state == PublishState.TIMEOUT) break;
                                                if (state == PublishState.INVALID_VERSION) {
                                                    this.suspendedTenants.add(tenant);
                                                    break;
                                                }
                                                if (state == PublishState.INTERRUPTED) {
                                                    return true;
                                                }
                                            }
                                            if (Duration.between(startOfDispatch, Instant.now()).getSeconds() <= this.emitTimeoutSeconds) continue;
                                            break;
                                        }
                                    }
                                    return false;
                                }));
                                if (interrupted || this.isShutdown()) {
                                    doPause.set(false);
                                    break;
                                }
                                this.telemetryData.recordStatistics(this.runtime, this.db, tenant);
                                continue;
                            }
                            LOG.debug("The outbox for the tenant '{}' in collector '{}' is empty", (Object)tenant, (Object)this.target);
                        }
                        finally {
                            if (scope == null) continue;
                            scope.close();
                        }
                    }
                    catch (Exception e) {
                        OpenTelemetryUtils.recordException((Optional)span, (Exception)e);
                        if (PartitionCollector.isLockTimeoutException(e)) {
                            LOG.debug("Collector '{}' timed out waiting for table lock for tenant '{}'", (Object)this.target, (Object)tenant);
                            continue;
                        }
                        if (TenantUtils.isUnknownTenant((Throwable)e)) {
                            LOG.debug("Unknown tenant '{}' for the outbox collector", (Object)tenant);
                            continue;
                        }
                        LOG.warn("Exception occurred for tenant '{}' in collector '{}'", new Object[]{tenant, this.target, e});
                    }
                    finally {
                        OpenTelemetryUtils.endSpan((Optional)span);
                    }
                }
                if (doPause.get()) {
                    this.pause(PartitionCollector.getPauseMillis(this.pauseCount.get(), this.maxPauseMillis));
                    if (this.pauseCount.get() >= 20) continue;
                    this.pauseCount.addAndGet(2);
                    continue;
                }
                this.pauseCount.set(0);
            }
            catch (Throwable e) {
                LOG.warn("Unexpected exception occurred in collector '{}'", (Object)this.target, (Object)e);
            }
        }
    }

    private long calculateOffset(Predicate whereClause) {
        if (this.ordered) {
            return 0L;
        }
        Select select = Select.from(Messages_.class).columns(new Function[]{c -> CQL.count().as("count")}).where((CqnPredicate)whereClause);
        Result res = this.db.run((CqnSelect)select, new Object[0]);
        long count = ((Number)res.single().get((Object)"count")).longValue();
        long chunks = count / (long)this.chunkSize;
        long offset = chunks < 2L ? 0L : ThreadLocalRandom.current().nextLong(chunks) * (long)this.chunkSize;
        LOG.debug("Calculated offset for unordered processing of outbox collector '{}' is {}", (Object)this.target, (Object)offset);
        return offset;
    }

    private PublishState publish(Messages msg, Instant startOfDispatch) {
        if (msg.getAttempts() != 0 && msg.getLastAttemptTimestamp() != null && Duration.between(msg.getLastAttemptTimestamp(), Instant.now()).toMillis() < PartitionCollector.getPauseMillis(msg.getAttempts(), Integer.MAX_VALUE)) {
            LOG.debug("Message '{}' cannot be republished until the retry waiting time is reached", (Object)msg.getId());
            return PublishState.TIMEOUT;
        }
        String outboxEvent = null;
        Map message = JsonParser.map((JsonNode)JsonParser.parseJson((String)msg.getMsg()));
        if (msg.getTarget().startsWith("messaging/") || msg.getTarget().startsWith("auditlog/")) {
            outboxEvent = msg.getTarget().substring(msg.getTarget().indexOf(47) + 1);
        } else {
            Version messageVersionParsed;
            outboxEvent = (String)message.get("event");
            String messageVersion = (String)message.get("version");
            message = (Map)message.get("message");
            if (this.checkVersion && !Objects.equals(this.appVersion, messageVersion) && this.appVersionParsed.compareTo(messageVersionParsed = VersionUtil.parseVersion((String)messageVersion, null, null)) < 0) {
                LOG.debug("Found newer version '{}' in outbox message. Suspending collector with version '{}'.", (Object)messageVersion, (Object)this.appVersion);
                return PublishState.INVALID_VERSION;
            }
        }
        OutboxMessageEventContext ctx = OutboxMessageEventContext.create((String)outboxEvent);
        ctx.setIsInbound(true);
        ctx.setTimestamp(msg.getTimestamp());
        ctx.setMessage((OutboxMessage)Struct.access((Map)message).as(OutboxMessage.class));
        RequestContextRunner runner = this.runtime.requestContext();
        OutboxUtils.restoreRequestContext((RequestContextRunner)runner, (OutboxMessage)ctx.getMessage());
        return (PublishState)((Object)runner.run(requestContext -> {
            LOG.debug("Publishing outbox message for target '{}' with event '{}'", (Object)msg.getTarget(), (Object)ctx.getEvent());
            while (!this.isShutdown()) {
                try {
                    this.outboxService.emit((EventContext)ctx);
                    return PublishState.SUCCESS;
                }
                catch (Throwable e) {
                    LOG.warn("Failed to emit Outbox message with id '{}' for target '{}' with event '{}'", new Object[]{msg.getId(), msg.getTarget(), ctx.getEvent(), e});
                    int currentAttempts = msg.getAttempts();
                    msg.setAttempts(++currentAttempts);
                    msg.setLastAttemptTimestamp(Instant.now());
                    HashMap<String, Object> data = new HashMap<String, Object>();
                    data.put("attempts", msg.getAttempts());
                    data.put("lastAttemptTimestamp", msg.getLastAttemptTimestamp());
                    if (this.storeLastError) {
                        StringWriter stringWriter = new StringWriter();
                        e.printStackTrace(new PrintWriter(stringWriter));
                        data.put("lastError", stringWriter.toString());
                    }
                    this.db.run((CqnUpdate)Update.entity(Messages_.class).data(data).where(m -> m.ID().eq((Object)msg.getId())), new Object[0]);
                    if (currentAttempts < this.maxPublishAttempts) {
                        long pauseInMillis = PartitionCollector.getPauseMillis(currentAttempts, Integer.MAX_VALUE);
                        if (Duration.between(startOfDispatch, Instant.now().plusMillis(pauseInMillis)).getSeconds() > this.emitTimeoutSeconds) {
                            LOG.debug("The retry waiting time of message '{}' would exceed the emit timeout, therefore release lock and commit transaction", (Object)msg.getId());
                            return PublishState.TIMEOUT;
                        }
                        this.pause(pauseInMillis);
                        continue;
                    }
                    LOG.warn("Reached maximum number of attempts to emit Outbox message with id '{}' to target '{}' with event '{}'", new Object[]{msg.getId(), msg.getTarget(), ctx.getEvent()});
                    return PublishState.FAILED;
                }
            }
            return PublishState.INTERRUPTED;
        }));
    }

    private static long getPauseMillis(int pauseCount, long maxTimeoutMillis) {
        long retryInMillis = Math.round(Math.pow(2.0, pauseCount) * 1000.0 + (double)ThreadLocalRandom.current().nextLong(1001L));
        return Math.min(retryInMillis, maxTimeoutMillis);
    }

    private static boolean isLockTimeoutException(Throwable t) {
        while (t != null) {
            if (t instanceof CdsLockTimeoutException) {
                return true;
            }
            t = t.getCause();
        }
        return false;
    }

    private static enum PublishState {
        SUCCESS,
        FAILED,
        TIMEOUT,
        INTERRUPTED,
        INVALID_VERSION;

    }
}

