/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.impl.outbox.persistence;

import com.google.common.annotations.VisibleForTesting;
import com.sap.cds.Result;
import com.sap.cds.Row;
import com.sap.cds.ql.CQL;
import com.sap.cds.ql.Select;
import com.sap.cds.ql.cqn.CqnPredicate;
import com.sap.cds.ql.cqn.CqnSelect;
import com.sap.cds.services.impl.model.DynamicModelProvider;
import com.sap.cds.services.impl.outbox.Messages_;
import com.sap.cds.services.impl.outbox.persistence.OutboxStatistics;
import com.sap.cds.services.impl.outbox.persistence.TelemetryData;
import com.sap.cds.services.persistence.PersistenceService;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.OpenTelemetryUtils;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.api.metrics.ObservableMeasurement;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class TelemetryDataImpl
implements TelemetryData {
    private static final Logger logger = LoggerFactory.getLogger(TelemetryDataImpl.class);
    private static final Object NULL_TENANT = new Object();
    private static final String OUTBOX_INFO_INSTRUMENTATION_SCOPE = "com.sap.cds.outbox";
    private static final List<OutboxMetric> OUTBOX_METRICS = List.of(new OutboxMetric("com.sap.cds.outbox.coldEntries", OutboxStatistics::coldEntries, "Number of entries that could not be delivered after repeated attempts and will not be retried anymore.", true), new OutboxMetric("com.sap.cds.outbox.remainingEntries", OutboxStatistics::remainingEntries, "Number of entries which are pending for delivery.", true), new OutboxMetric("com.sap.cds.outbox.maxStorageTimeSeconds", OutboxStatistics::maxStorageTime, "Maximum time in seconds an entry was residing in the outbox.", true), new OutboxMetric("com.sap.cds.outbox.medStorageTimeSeconds", OutboxStatistics::medianStorageTime, "Median time in seconds of an entry stored in the outbox.", true), new OutboxMetric("com.sap.cds.outbox.minStorageTimeSeconds", OutboxStatistics::minStorageTime, "Minimal time in seconds an entry was stored in the outbox.", true), new OutboxMetric("com.sap.cds.outbox.incomingMessages", OutboxStatistics::incomingMessages, "Number of incoming messages of the outbox.", false), new OutboxMetric("com.sap.cds.outbox.outgoingMessages", OutboxStatistics::outgoingMessages, "Number of outgoing messages of the outbox.", false));
    private final Map<Object, OutboxStatistics> statistics = new ConcurrentHashMap<Object, OutboxStatistics>();
    private final List<ObservableLongMeasurement> observers = new ArrayList<ObservableLongMeasurement>();
    private final String outboxName;
    private final int maxAttempts;

    public TelemetryDataImpl(String outboxName, int maxAttempts) {
        this.outboxName = outboxName;
        this.maxAttempts = maxAttempts;
        this.initializeOtel();
    }

    @Override
    public Collection<OutboxStatistics> getStatistics() {
        return this.statistics.values();
    }

    @VisibleForTesting
    OutboxStatistics getStatistics(String tenant) {
        return this.statistics.computeIfAbsent(tenant == null ? NULL_TENANT : tenant, t -> new OutboxStatistics(tenant));
    }

    @Override
    public void recordIncomingMessages(String tenant, long count) {
        this.getStatistics(tenant).increaseIncomingMessages(count);
    }

    @Override
    public void recordOutgoingMessages(String tenant, long count) {
        this.getStatistics(tenant).increaseOutgoingMessages(count);
    }

    @Override
    public void recordStatistics(CdsRuntime runtime, PersistenceService db, String tenant) {
        logger.debug("Collecting statistics for outbox '{}' and tenant '{}'", (Object)this.outboxName, (Object)tenant);
        runtime.requestContext().featureToggles(DynamicModelProvider.STATIC_MODEL_ACCESS_FEATURE).systemUser(tenant).run(req -> {
            try {
                Select selectCold = Select.from(Messages_.class).columns(new Function[]{c -> CQL.count().as("count_cold")}).where(e -> e.target().eq((Object)this.outboxName).and((CqnPredicate)e.attempts().ge((Object)this.maxAttempts), new CqnPredicate[0]));
                Result coldCountResult = db.run((CqnSelect)selectCold, new Object[0]);
                long coldCount = ((Number)((Row)coldCountResult.single()).get((Object)"count_cold")).longValue();
                Select selectRemaining = Select.from(Messages_.class).columns(new Function[]{c -> CQL.count().as("count_hot"), c -> CQL.min(c.timestamp()).as("maxTimestamp"), c -> CQL.max(c.timestamp()).as("minTimestamp")}).where(e -> e.target().eq((Object)this.outboxName).and((CqnPredicate)e.attempts().lt((Object)this.maxAttempts), new CqnPredicate[0]));
                Row remainingRow = (Row)db.run((CqnSelect)selectRemaining, new Object[0]).single();
                long remainingCount = ((Number)remainingRow.get((Object)"count_hot")).longValue();
                Instant maxTimestamp = (Instant)remainingRow.get((Object)"maxTimestamp");
                Instant minTimestamp = (Instant)remainingRow.get((Object)"minTimestamp");
                long medianIndex = remainingCount / 2L;
                Select selectMedian = Select.from(Messages_.class).columns(new Function[]{c -> c.timestamp().as("medTimestamp")}).where(e -> e.target().eq((Object)this.outboxName).and((CqnPredicate)e.attempts().lt((Object)this.maxAttempts), new CqnPredicate[0])).orderBy(new Function[]{c -> c.timestamp().asc()}).limit(1L, medianIndex);
                Instant medianTimestamp = db.run((CqnSelect)selectMedian, new Object[0]).first().map(row -> (Instant)row.get((Object)"medTimestamp")).orElse(null);
                Instant now = Instant.now();
                long maxStorageTimeSeconds = maxTimestamp != null ? Duration.between(maxTimestamp, now).toSeconds() : 0L;
                long medianStorageTimeSeconds = medianTimestamp != null ? Duration.between(medianTimestamp, now).toSeconds() : 0L;
                long minStorageTimeSeconds = minTimestamp != null ? Duration.between(minTimestamp, now).toSeconds() : 0L;
                OutboxStatistics stats = this.getStatistics(tenant);
                stats.setColdEntries(coldCount);
                stats.setRemainingEntries(remainingCount);
                stats.setMaxStorageTime(maxStorageTimeSeconds);
                stats.setMedianStorageTime(medianStorageTimeSeconds);
                stats.setMinStorageTime(minStorageTimeSeconds);
                logger.debug("Finished collecting outbox statistics for outbox '{}' and tenant '{}'", (Object)this.outboxName, (Object)tenant);
            }
            catch (Exception e2) {
                logger.warn("Failed to collect statistics for outbox '{}' in tenant '{}'", new Object[]{this.outboxName, tenant, e2});
            }
        });
    }

    private void initializeOtel() {
        Meter meter = OpenTelemetryUtils.getMeter((String)OUTBOX_INFO_INSTRUMENTATION_SCOPE);
        OUTBOX_METRICS.forEach(info -> {
            if (info.isGauge()) {
                this.observers.add(meter.gaugeBuilder(info.name()).setDescription(info.description()).ofLongs().buildObserver());
            } else {
                this.observers.add(meter.counterBuilder(info.name()).setDescription(info.description()).buildObserver());
            }
        });
        meter.batchCallback(this::recordAll, (ObservableMeasurement)this.observers.get(0), (ObservableMeasurement[])this.observers.subList(1, this.observers.size()).toArray(new ObservableLongMeasurement[this.observers.size() - 1]));
    }

    private void recordAll() {
        logger.debug("Recording measurements for outbox '{}'", (Object)this.outboxName);
        ArrayList measureLogs = new ArrayList(0);
        this.statistics.values().forEach(stats -> {
            Attributes attr = Attributes.of((AttributeKey)OpenTelemetryUtils.CDS_TENANT, (Object)stats.getTenant(), (AttributeKey)OpenTelemetryUtils.CDS_OUTBOX_TARGET, (Object)this.outboxName);
            for (int i = 0; i < this.observers.size(); ++i) {
                OutboxMetric metric = OUTBOX_METRICS.get(i);
                long value = metric.provider().apply((OutboxStatistics)stats);
                if (value == -1L) continue;
                this.observers.get(i).record(value, attr);
                if (!logger.isTraceEnabled()) continue;
                measureLogs.add(String.format("%s(%s)=%d", metric.name(), stats.getTenant(), value));
            }
        });
        if (logger.isTraceEnabled()) {
            logger.trace("Recorded measurements for outbox '{}': {}", (Object)this.outboxName, (Object)String.join((CharSequence)", ", measureLogs));
        }
    }

    private record OutboxMetric(String name, Function<OutboxStatistics, Long> provider, String description, boolean isGauge) {
    }
}

