/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.impl.outbox.persistence.collectors;

import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.core.util.VersionUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.sap.cds.CdsLockTimeoutException;
import com.sap.cds.Result;
import com.sap.cds.Struct;
import com.sap.cds.impl.parser.JsonParser;
import com.sap.cds.ql.CQL;
import com.sap.cds.ql.Delete;
import com.sap.cds.ql.Predicate;
import com.sap.cds.ql.Select;
import com.sap.cds.ql.Selectable;
import com.sap.cds.ql.Update;
import com.sap.cds.ql.Value;
import com.sap.cds.ql.cqn.CqnDelete;
import com.sap.cds.ql.cqn.CqnPredicate;
import com.sap.cds.ql.cqn.CqnSelect;
import com.sap.cds.ql.cqn.CqnUpdate;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.impl.model.DynamicModelProvider;
import com.sap.cds.services.impl.outbox.Messages;
import com.sap.cds.services.impl.outbox.Messages_;
import com.sap.cds.services.impl.outbox.persistence.PersistentOutbox;
import com.sap.cds.services.impl.outbox.persistence.TelemetryData;
import com.sap.cds.services.impl.outbox.persistence.collectors.OutboxCollector;
import com.sap.cds.services.impl.scheduler.TaskScheduler;
import com.sap.cds.services.outbox.OutboxMessage;
import com.sap.cds.services.outbox.OutboxMessageEventContext;
import com.sap.cds.services.persistence.PersistenceService;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.runtime.RequestContextRunner;
import com.sap.cds.services.utils.OpenTelemetryUtils;
import com.sap.cds.services.utils.lib.mt.TenantUtils;
import com.sap.cds.services.utils.outbox.OutboxUtils;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.opentelemetry.context.Scope;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskBasedCollector
implements OutboxCollector {
    private static final Logger LOG = LoggerFactory.getLogger(TaskBasedCollector.class);
    private final CdsRuntime runtime;
    private final PersistentOutbox outboxService;
    private final TaskScheduler taskScheduler;
    private final TelemetryData telemetryData;
    private final String target;
    private final boolean storeLastError;
    private final int maxPublishAttempts;
    private final boolean ordered;
    private final long maxFailPause;
    private final boolean checkVersion;
    private final String appVersion;
    private final Version appVersionParsed;
    private final Set<String> suspendedTenants = ConcurrentHashMap.newKeySet();
    private PersistenceService db;
    private boolean isStarted;

    public TaskBasedCollector(CdsRuntime runtime, PersistentOutbox outboxService, CdsProperties.Outbox.OutboxServiceConfig config, String appVersion, TaskScheduler taskScheduler, TelemetryData telemetryData) {
        this.runtime = runtime;
        this.outboxService = outboxService;
        this.taskScheduler = taskScheduler;
        this.telemetryData = telemetryData;
        this.target = outboxService.getName();
        this.storeLastError = config.getStoreLastError().isEnabled();
        this.maxPublishAttempts = config.getMaxAttempts();
        this.ordered = config.isOrdered();
        this.maxFailPause = config.getMaxPause().toMillis();
        this.checkVersion = config.isCheckVersion();
        this.appVersion = appVersion;
        Version version = this.appVersionParsed = this.checkVersion ? VersionUtil.parseVersion((String)appVersion, null, null) : null;
        if (!this.storeLastError) {
            LOG.debug("Storing errors for outbox '{}' is disabled.", (Object)outboxService.getName());
        }
    }

    @Override
    public void start() {
        LOG.debug("Starting collector of the outbox '{}'", (Object)this.outboxService.getName());
        this.db = (PersistenceService)this.runtime.getServiceCatalog().getService(PersistenceService.class, "PersistenceService$Default");
        this.isStarted = true;
    }

    @Override
    public void stop(long waitMillis) {
        LOG.debug("Stopping collector of the outbox '{}'", (Object)this.outboxService.getName());
        this.isStarted = false;
    }

    @Override
    public boolean isRunning() {
        return this.isStarted;
    }

    @Override
    public void schedule(String tenant, long delay, boolean withEmptyCheck) {
        if (!this.suspendedTenants.contains(tenant)) {
            this.taskScheduler.scheduleTask(this.target + "/" + tenant, new CollectorTask(tenant, withEmptyCheck), delay);
        }
    }

    private long calculateOffset(Predicate whereClause) {
        if (this.ordered) {
            return 0L;
        }
        Select select = Select.from(Messages_.class).columns(new Selectable[]{CQL.count().as("count")}).where((CqnPredicate)whereClause);
        Result res = this.db.run((CqnSelect)select, new Object[0]);
        long count = ((Number)res.single().get((Object)"count")).longValue();
        long offset = count < 2L ? 0L : ThreadLocalRandom.current().nextLong(count);
        LOG.debug("Calculated offset for unordered processing of outbox collector '{}' is {}", (Object)this.target, (Object)offset);
        return offset;
    }

    private PublishState publish(Messages msg) {
        Version messageVersionParsed;
        Map message = JsonParser.map((JsonNode)JsonParser.parseJson((String)msg.getMsg()));
        String outboxEvent = (String)message.get("event");
        String messageVersion = (String)message.get("version");
        message = (Map)message.get("message");
        if (this.checkVersion && !Objects.equals(this.appVersion, messageVersion) && this.appVersionParsed.compareTo(messageVersionParsed = VersionUtil.parseVersion((String)messageVersion, null, null)) < 0) {
            LOG.debug("Found newer version '{}' in outbox message. Suspending collector '{}' with version '{}'.", new Object[]{messageVersion, this.target, this.appVersion});
            return PublishState.INVALID_VERSION;
        }
        OutboxMessageEventContext ctx = OutboxMessageEventContext.create((String)outboxEvent);
        ctx.setIsInbound(true);
        ctx.setTimestamp(msg.getTimestamp());
        ctx.setMessage((OutboxMessage)Struct.access((Map)message).as(OutboxMessage.class));
        RequestContextRunner runner = this.runtime.requestContext();
        OutboxUtils.restoreRequestContext((RequestContextRunner)runner, (OutboxMessage)ctx.getMessage());
        return (PublishState)((Object)runner.run(requestContext -> {
            try {
                LOG.debug("Publishing outbox message to outbox '{}' with event '{}'", (Object)this.target, (Object)outboxEvent);
                this.outboxService.emit((EventContext)ctx);
                return PublishState.SUCCESS;
            }
            catch (Throwable e) {
                LOG.warn("Failed to emit outbox message with id '{}' to outbox '{}' with event '{}'", new Object[]{msg.getId(), this.target, outboxEvent, e});
                HashMap<String, Object> data = new HashMap<String, Object>();
                data.put("attempts", msg.getAttempts() + 1);
                data.put("lastAttemptTimestamp", Instant.now());
                if (this.storeLastError) {
                    StringWriter stringWriter = new StringWriter();
                    e.printStackTrace(new PrintWriter(stringWriter));
                    data.put("lastError", stringWriter.toString());
                }
                this.db.run((CqnUpdate)Update.entity(Messages_.class).data(data).byId((Object)msg.getId()), new Object[0]);
                return PublishState.FAILED;
            }
        }));
    }

    private long getPauseMillis(int attempt) {
        long retryInMillis = Math.round(Math.pow(2.0, attempt) * 1000.0 + (double)ThreadLocalRandom.current().nextLong(1001L));
        return Math.min(retryInMillis, this.maxFailPause);
    }

    private static boolean isLockTimeoutException(Throwable t) {
        while (t != null) {
            if (t instanceof CdsLockTimeoutException) {
                return true;
            }
            t = t.getCause();
        }
        return false;
    }

    private class CollectorTask
    implements TaskScheduler.Task {
        private final String tenant;
        private final boolean withEmptyCheck;

        public CollectorTask(String tenant, boolean withEmptyCheck) {
            this.tenant = tenant;
            this.withEmptyCheck = withEmptyCheck;
        }

        @Override
        public boolean isReady() {
            return TaskBasedCollector.this.isStarted;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public TaskScheduler.TaskSchedule run() {
            if (TaskBasedCollector.this.suspendedTenants.contains(this.tenant)) {
                return null;
            }
            Optional span = OpenTelemetryUtils.createSpan((OpenTelemetryUtils.CdsSpanType)OpenTelemetryUtils.CdsSpanType.OUTBOX, (SpanKind)SpanKind.SERVER);
            try {
                TaskScheduler.TaskSchedule taskSchedule;
                block22: {
                    Scope scope;
                    block20: {
                        TaskScheduler.TaskSchedule taskSchedule2;
                        block21: {
                            scope = span.map(ImplicitContextKeyed::makeCurrent).orElse(null);
                            try {
                                boolean isEmpty;
                                span.ifPresent(s -> {
                                    s.updateName("Outbox Collector (" + TaskBasedCollector.this.target + ")");
                                    s.setAttribute(OpenTelemetryUtils.CDS_TENANT, (Object)this.tenant);
                                    s.setAttribute(OpenTelemetryUtils.CDS_OUTBOX_TARGET, (Object)TaskBasedCollector.this.target);
                                });
                                if (!this.withEmptyCheck || !(isEmpty = ((Boolean)TaskBasedCollector.this.runtime.requestContext().featureToggles(DynamicModelProvider.STATIC_MODEL_ACCESS_FEATURE).systemUser(this.tenant).run(req -> TaskBasedCollector.this.outboxService.isEmpty())).booleanValue())) break block20;
                                LOG.debug("The outbox '{}' for tenant '{}' is empty", (Object)TaskBasedCollector.this.target, (Object)this.tenant);
                                taskSchedule2 = null;
                                if (scope == null) break block21;
                            }
                            catch (Throwable throwable) {
                                try {
                                    if (scope != null) {
                                        try {
                                            scope.close();
                                        }
                                        catch (Throwable throwable2) {
                                            throwable.addSuppressed(throwable2);
                                        }
                                    }
                                    throw throwable;
                                }
                                catch (Exception e) {
                                    OpenTelemetryUtils.recordException((Optional)span, (Exception)e);
                                    if (TaskBasedCollector.isLockTimeoutException(e)) {
                                        LOG.debug("Collector '{}' timed out waiting for table lock for tenant '{}'", (Object)TaskBasedCollector.this.target, (Object)this.tenant);
                                        if (!TaskBasedCollector.this.ordered) {
                                            TaskScheduler.TaskSchedule taskSchedule3 = new TaskScheduler.TaskSchedule(new CollectorTask(this.tenant, false));
                                            return taskSchedule3;
                                        }
                                    } else if (TenantUtils.isUnknownTenant((Throwable)e)) {
                                        LOG.debug("Unknown tenant '{}' for the outbox collector '{}'", (Object)this.tenant, (Object)TaskBasedCollector.this.target);
                                    } else {
                                        LOG.warn("Exception occurred for tenant '{}' in collector '{}'", new Object[]{this.tenant, TaskBasedCollector.this.target, e});
                                    }
                                    TaskScheduler.TaskSchedule taskSchedule4 = null;
                                    return taskSchedule4;
                                }
                            }
                            scope.close();
                        }
                        return taskSchedule2;
                    }
                    LOG.debug("Processing tenant '{}' in collector '{}'", (Object)this.tenant, (Object)TaskBasedCollector.this.target);
                    TaskScheduler.TaskSchedule result = (TaskScheduler.TaskSchedule)TaskBasedCollector.this.runtime.requestContext().systemUser(this.tenant).run(req -> (TaskScheduler.TaskSchedule)TaskBasedCollector.this.runtime.changeSetContext().run(ctx -> {
                        Predicate where = CQL.get((String)"target").eq((Object)TaskBasedCollector.this.target).and((CqnPredicate)CQL.get((String)"attempts").lt((Value)CQL.constant((Object)TaskBasedCollector.this.maxPublishAttempts)), new CqnPredicate[0]);
                        long skip = TaskBasedCollector.this.calculateOffset(where);
                        Select select = Select.from(Messages_.class).where((CqnPredicate)where).orderBy(new Function[]{e -> e.timestamp().asc(), e -> e.ID().asc()}).limit(1L, skip).lock(0);
                        Messages message = TaskBasedCollector.this.db.run((CqnSelect)select, new Object[0]).first(Messages.class).orElse(null);
                        if (message != null) {
                            int attempt = message.getAttempts();
                            switch (TaskBasedCollector.this.publish(message)) {
                                case SUCCESS: {
                                    TaskBasedCollector.this.db.run((CqnDelete)Delete.from(Messages_.class).byId((Object)message.getId()), new Object[0]);
                                    TaskBasedCollector.this.telemetryData.recordOutgoingMessages(this.tenant, 1L);
                                    return new TaskScheduler.TaskSchedule(new CollectorTask(this.tenant, false));
                                }
                                case INVALID_VERSION: {
                                    TaskBasedCollector.this.suspendedTenants.add(this.tenant);
                                    return null;
                                }
                                case FAILED: {
                                    return new TaskScheduler.TaskSchedule(new CollectorTask(this.tenant, false), TaskBasedCollector.this.getPauseMillis(attempt), false);
                                }
                            }
                        }
                        return null;
                    }));
                    TaskBasedCollector.this.telemetryData.recordStatistics(TaskBasedCollector.this.runtime, TaskBasedCollector.this.db, this.tenant);
                    taskSchedule = result;
                    if (scope == null) break block22;
                    scope.close();
                }
                return taskSchedule;
            }
            finally {
                OpenTelemetryUtils.endSpan((Optional)span);
            }
        }
    }

    private static enum PublishState {
        SUCCESS,
        FAILED,
        INVALID_VERSION;

    }
}

