/*
 * Decompiled with CFR 0.152.
 */
package ddtrot.dd.trace.core.datastreams;

import datadog.trace.api.Config;
import datadog.trace.api.TraceConfig;
import datadog.trace.api.experimental.DataStreamsContextCarrier;
import ddtrot.dd.communication.ddagent.DDAgentFeaturesDiscovery;
import ddtrot.dd.communication.ddagent.SharedCommunicationObjects;
import ddtrot.dd.context.propagation.Propagator;
import ddtrot.dd.trace.api.datastreams.Backlog;
import ddtrot.dd.trace.api.datastreams.DataStreamsContext;
import ddtrot.dd.trace.api.datastreams.DataStreamsTags;
import ddtrot.dd.trace.api.datastreams.InboxItem;
import ddtrot.dd.trace.api.datastreams.NoopPathwayContext;
import ddtrot.dd.trace.api.datastreams.PathwayContext;
import ddtrot.dd.trace.api.datastreams.StatsPoint;
import ddtrot.dd.trace.api.time.TimeSource;
import ddtrot.dd.trace.bootstrap.instrumentation.api.AgentSpan;
import ddtrot.dd.trace.bootstrap.instrumentation.api.AgentTracer;
import ddtrot.dd.trace.bootstrap.instrumentation.api.Schema;
import ddtrot.dd.trace.bootstrap.instrumentation.api.SchemaIterator;
import ddtrot.dd.trace.common.metrics.EventListener;
import ddtrot.dd.trace.common.metrics.OkHttpSink;
import ddtrot.dd.trace.common.metrics.Sink;
import ddtrot.dd.trace.core.DDSpan;
import ddtrot.dd.trace.core.DDTraceCoreInfo;
import ddtrot.dd.trace.core.datastreams.DataStreamsContextCarrierAdapter;
import ddtrot.dd.trace.core.datastreams.DataStreamsMonitoring;
import ddtrot.dd.trace.core.datastreams.DataStreamsPropagator;
import ddtrot.dd.trace.core.datastreams.DatastreamsPayloadWriter;
import ddtrot.dd.trace.core.datastreams.DefaultPathwayContext;
import ddtrot.dd.trace.core.datastreams.MsgPackDatastreamsPayloadWriter;
import ddtrot.dd.trace.core.datastreams.SchemaBuilder;
import ddtrot.dd.trace.core.datastreams.SchemaSampler;
import ddtrot.dd.trace.core.datastreams.StatsBucket;
import ddtrot.dd.trace.util.AgentTaskScheduler;
import ddtrot.dd.trace.util.AgentThreadFactory;
import ddtrot.org.jctools.queues.MpscArrayQueue;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultDataStreamsMonitoring
implements DataStreamsMonitoring,
EventListener {
    private static final Logger log = LoggerFactory.getLogger(DefaultDataStreamsMonitoring.class);
    static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5L);
    private static final StatsPoint REPORT = new StatsPoint(DataStreamsTags.EMPTY, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null);
    private static final StatsPoint POISON_PILL = new StatsPoint(DataStreamsTags.EMPTY, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null);
    private final Map<Long, Map<String, StatsBucket>> timeToBucket = new HashMap<Long, Map<String, StatsBucket>>();
    private final MpscArrayQueue<InboxItem> inbox = new MpscArrayQueue(1024);
    private final DatastreamsPayloadWriter payloadWriter;
    private final DDAgentFeaturesDiscovery features;
    private final TimeSource timeSource;
    private final Supplier<TraceConfig> traceConfigSupplier;
    private final long bucketDurationNanos;
    private final Thread thread;
    private final DataStreamsPropagator propagator;
    private AgentTaskScheduler.Scheduled<DefaultDataStreamsMonitoring> cancellation;
    private volatile long nextFeatureCheck;
    private volatile boolean supportsDataStreams = false;
    private volatile boolean agentSupportsDataStreams = false;
    private volatile boolean configSupportsDataStreams = false;
    private final ConcurrentHashMap<String, SchemaSampler> schemaSamplers;
    private static final ThreadLocal<String> serviceNameOverride = new ThreadLocal();

    public DefaultDataStreamsMonitoring(Config config, SharedCommunicationObjects sharedCommunicationObjects, TimeSource timeSource, Supplier<TraceConfig> traceConfigSupplier) {
        this(new OkHttpSink(sharedCommunicationObjects.okHttpClient, sharedCommunicationObjects.agentUrl.toString(), "v0.1/pipeline_stats", false, true, Collections.emptyMap()), sharedCommunicationObjects.featuresDiscovery(config), timeSource, traceConfigSupplier, config);
    }

    public DefaultDataStreamsMonitoring(Sink sink, DDAgentFeaturesDiscovery features, TimeSource timeSource, Supplier<TraceConfig> traceConfigSupplier, Config config) {
        this(sink, features, timeSource, traceConfigSupplier, new MsgPackDatastreamsPayloadWriter(sink, config.getWellKnownTags(), DDTraceCoreInfo.VERSION, config.getPrimaryTag()), Config.get().getDataStreamsBucketDurationNanoseconds());
    }

    public DefaultDataStreamsMonitoring(Sink sink, DDAgentFeaturesDiscovery features, TimeSource timeSource, Supplier<TraceConfig> traceConfigSupplier, DatastreamsPayloadWriter payloadWriter, long bucketDurationNanos) {
        this.features = features;
        this.timeSource = timeSource;
        this.traceConfigSupplier = traceConfigSupplier;
        this.payloadWriter = payloadWriter;
        this.bucketDurationNanos = bucketDurationNanos;
        this.thread = AgentThreadFactory.newAgentThread(AgentThreadFactory.AgentThread.DATA_STREAMS_MONITORING, new InboxProcessor());
        sink.register(this);
        this.schemaSamplers = new ConcurrentHashMap();
        this.propagator = new DataStreamsPropagator(this, this.timeSource, serviceNameOverride);
        DataStreamsTags.setServiceNameOverride(serviceNameOverride);
    }

    @Override
    public void start() {
        this.checkDynamicConfig();
        this.cancellation = AgentTaskScheduler.get().scheduleAtFixedRate(new ReportTask(), this, this.bucketDurationNanos, this.bucketDurationNanos, TimeUnit.NANOSECONDS);
        this.thread.start();
    }

    @Override
    public void add(StatsPoint statsPoint) {
        if (this.thread.isAlive()) {
            this.inbox.offer(statsPoint);
        }
    }

    @Override
    public int trySampleSchema(String topic) {
        SchemaSampler sampler = this.schemaSamplers.computeIfAbsent(topic, t -> new SchemaSampler());
        return sampler.trySample(this.timeSource.getCurrentTimeMillis());
    }

    @Override
    public boolean canSampleSchema(String topic) {
        SchemaSampler sampler = this.schemaSamplers.computeIfAbsent(topic, t -> new SchemaSampler());
        return sampler.canSample(this.timeSource.getCurrentTimeMillis());
    }

    @Override
    public Schema getSchema(String schemaName, SchemaIterator iterator) {
        return SchemaBuilder.getSchema(schemaName, iterator);
    }

    @Override
    public void setProduceCheckpoint(String type, String target) {
        this.setProduceCheckpoint(type, target, DataStreamsContextCarrier.NoOp.INSTANCE, false);
    }

    @Override
    public void setThreadServiceName(String serviceName) {
        if (serviceName == null) {
            this.clearThreadServiceName();
            return;
        }
        serviceNameOverride.set(serviceName);
    }

    @Override
    public void clearThreadServiceName() {
        serviceNameOverride.remove();
    }

    private static String getThreadServiceName() {
        return serviceNameOverride.get();
    }

    @Override
    public PathwayContext newPathwayContext() {
        if (this.configSupportsDataStreams) {
            return new DefaultPathwayContext(this.timeSource, DefaultDataStreamsMonitoring.getThreadServiceName());
        }
        return NoopPathwayContext.INSTANCE;
    }

    @Override
    public Propagator propagator() {
        return this.propagator;
    }

    @Override
    public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrier carrier) {
        if (span instanceof DDSpan) {
            DefaultPathwayContext pathwayContext = DefaultPathwayContext.extract(carrier, DataStreamsContextCarrierAdapter.INSTANCE, this.timeSource, DefaultDataStreamsMonitoring.getThreadServiceName());
            ((DDSpan)span).context().mergePathwayContext(pathwayContext);
        }
    }

    @Override
    public void trackBacklog(DataStreamsTags tags, long value) {
        this.inbox.offer(new Backlog(tags, value, this.timeSource.getCurrentTimeNanos(), DefaultDataStreamsMonitoring.getThreadServiceName()));
    }

    @Override
    public void setCheckpoint(AgentSpan span, DataStreamsContext context) {
        PathwayContext pathwayContext = span.context().getPathwayContext();
        if (pathwayContext != null) {
            pathwayContext.setCheckpoint(context, this::add);
        }
    }

    public void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier) {
        this.setConsumeCheckpoint(type, source, carrier, true);
    }

    public void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier, Boolean isManual) {
        if (type == null || type.isEmpty() || source == null || source.isEmpty()) {
            log.warn("setConsumeCheckpoint should be called with non-empty type and source");
            return;
        }
        AgentSpan span = AgentTracer.activeSpan();
        if (span == null) {
            log.warn("SetConsumeCheckpoint is called with no active span");
            return;
        }
        this.mergePathwayContextIntoSpan(span, carrier);
        DataStreamsTags tags = isManual != false ? DataStreamsTags.createManual(type, DataStreamsTags.Direction.INBOUND, source) : DataStreamsTags.create(type, DataStreamsTags.Direction.INBOUND, source);
        this.setCheckpoint(span, DataStreamsContext.fromTags(tags));
    }

    public void setProduceCheckpoint(String type, String target, DataStreamsContextCarrier carrier, boolean manualCheckpoint) {
        if (type == null || type.isEmpty() || target == null || target.isEmpty()) {
            log.warn("SetProduceCheckpoint should be called with non-empty type and target");
            return;
        }
        AgentSpan span = AgentTracer.activeSpan();
        if (span == null) {
            log.warn("SetProduceCheckpoint is called with no active span");
            return;
        }
        DataStreamsTags tags = manualCheckpoint ? DataStreamsTags.createManual(type, DataStreamsTags.Direction.OUTBOUND, target) : DataStreamsTags.create(type, DataStreamsTags.Direction.OUTBOUND, target);
        DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
        this.propagator.inject(span.with(dsmContext), carrier, DataStreamsContextCarrierAdapter.INSTANCE);
    }

    public void setProduceCheckpoint(String type, String target, DataStreamsContextCarrier carrier) {
        this.setProduceCheckpoint(type, target, carrier, true);
    }

    @Override
    public void close() {
        if (null != this.cancellation) {
            this.cancellation.cancel();
        }
        this.inbox.offer(POISON_PILL);
        try {
            this.thread.join(800L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private long currentBucket(long timestampNanos) {
        return timestampNanos - timestampNanos % this.bucketDurationNanos;
    }

    private void flush(long timestampNanos) {
        long currentBucket = this.currentBucket(timestampNanos);
        HashMap includedBuckets = new HashMap();
        Iterator<Map.Entry<Long, Map<String, StatsBucket>>> mapIterator = this.timeToBucket.entrySet().iterator();
        while (mapIterator.hasNext()) {
            Map.Entry<Long, Map<String, StatsBucket>> entry = mapIterator.next();
            if (entry.getKey() >= currentBucket) continue;
            mapIterator.remove();
            for (Map.Entry<String, StatsBucket> buckets : entry.getValue().entrySet()) {
                if (!includedBuckets.containsKey(buckets.getKey())) {
                    includedBuckets.put(buckets.getKey(), new LinkedList());
                }
                ((List)includedBuckets.get(buckets.getKey())).add(buckets.getValue());
            }
        }
        if (!includedBuckets.isEmpty()) {
            for (Map.Entry entry : includedBuckets.entrySet()) {
                if (((List)entry.getValue()).isEmpty()) continue;
                log.debug("Flushing {} buckets ({})", entry.getValue(), entry.getKey());
                this.payloadWriter.writePayload((Collection)entry.getValue(), (String)entry.getKey());
            }
        }
    }

    @Override
    public void clear() {
        this.timeToBucket.clear();
        this.schemaSamplers.clear();
    }

    void report() {
        this.inbox.offer(REPORT);
    }

    @Override
    public void onEvent(EventListener.EventType eventType, String message) {
        switch (eventType) {
            case DOWNGRADED: {
                log.debug("Agent downgrade was detected");
                this.checkFeatures();
                break;
            }
            case BAD_PAYLOAD: {
                log.debug("bad metrics payload sent to trace agent: {}", (Object)message);
                break;
            }
            case ERROR: {
                log.debug("trace agent errored receiving metrics payload: {}", (Object)message);
                break;
            }
        }
    }

    private void checkDynamicConfig() {
        this.configSupportsDataStreams = this.traceConfigSupplier.get().isDataStreamsEnabled();
        this.supportsDataStreams = this.agentSupportsDataStreams && this.configSupportsDataStreams;
    }

    private void checkFeatures() {
        boolean oldValue = this.agentSupportsDataStreams;
        this.features.discoverIfOutdated();
        this.agentSupportsDataStreams = this.features.supportsDataStreams();
        if (oldValue && !this.agentSupportsDataStreams && this.configSupportsDataStreams) {
            log.info("Disabling data streams reporting because it is not supported by the agent");
        } else if (!oldValue && this.agentSupportsDataStreams && this.configSupportsDataStreams) {
            log.info("Agent upgrade detected. Enabling data streams because it is now supported");
        } else if (!oldValue && this.agentSupportsDataStreams && !this.configSupportsDataStreams) {
            log.info("Agent upgrade detected. Not enabling data streams because it is disabled by config");
        }
        this.supportsDataStreams = this.agentSupportsDataStreams && this.configSupportsDataStreams;
        this.nextFeatureCheck = this.timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS;
    }

    private static final class ReportTask
    implements AgentTaskScheduler.Task<DefaultDataStreamsMonitoring> {
        private ReportTask() {
        }

        @Override
        public void run(DefaultDataStreamsMonitoring target) {
            target.report();
        }
    }

    private class InboxProcessor
    implements Runnable {
        private InboxProcessor() {
        }

        private StatsBucket getStatsBucket(long timestamp, String serviceNameOverride) {
            long bucket = DefaultDataStreamsMonitoring.this.currentBucket(timestamp);
            Map statsBucketMap = DefaultDataStreamsMonitoring.this.timeToBucket.computeIfAbsent(bucket, startTime -> new HashMap(1));
            return statsBucketMap.computeIfAbsent(serviceNameOverride, s -> new StatsBucket(bucket, DefaultDataStreamsMonitoring.this.bucketDurationNanos));
        }

        @Override
        public void run() {
            if (DefaultDataStreamsMonitoring.this.features.getDataStreamsEndpoint() == null) {
                DefaultDataStreamsMonitoring.this.features.discoverIfOutdated();
            }
            DefaultDataStreamsMonitoring.this.agentSupportsDataStreams = DefaultDataStreamsMonitoring.this.features.supportsDataStreams();
            DefaultDataStreamsMonitoring.this.checkDynamicConfig();
            if (!DefaultDataStreamsMonitoring.this.configSupportsDataStreams) {
                log.debug("Data streams is disabled");
            } else if (!DefaultDataStreamsMonitoring.this.agentSupportsDataStreams) {
                log.debug("Data streams is disabled or not supported by agent");
            }
            DefaultDataStreamsMonitoring.this.nextFeatureCheck = DefaultDataStreamsMonitoring.this.timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS;
            Thread currentThread = Thread.currentThread();
            while (!currentThread.isInterrupted()) {
                try {
                    StatsBucket statsBucket;
                    InboxItem payload = (InboxItem)DefaultDataStreamsMonitoring.this.inbox.poll();
                    if (payload == null) {
                        Thread.sleep(10L);
                        continue;
                    }
                    if (payload == REPORT) {
                        DefaultDataStreamsMonitoring.this.checkDynamicConfig();
                        if (DefaultDataStreamsMonitoring.this.supportsDataStreams) {
                            DefaultDataStreamsMonitoring.this.flush(DefaultDataStreamsMonitoring.this.timeSource.getCurrentTimeNanos());
                            continue;
                        }
                        if (DefaultDataStreamsMonitoring.this.timeSource.getCurrentTimeNanos() < DefaultDataStreamsMonitoring.this.nextFeatureCheck) continue;
                        DefaultDataStreamsMonitoring.this.checkFeatures();
                        continue;
                    }
                    if (payload == POISON_PILL) {
                        if (!DefaultDataStreamsMonitoring.this.supportsDataStreams) break;
                        DefaultDataStreamsMonitoring.this.flush(Long.MAX_VALUE);
                        break;
                    }
                    if (!DefaultDataStreamsMonitoring.this.supportsDataStreams) continue;
                    if (payload instanceof StatsPoint) {
                        StatsPoint statsPoint = (StatsPoint)payload;
                        statsBucket = this.getStatsBucket(statsPoint.getTimestampNanos(), statsPoint.getServiceNameOverride());
                        statsBucket.addPoint(statsPoint);
                        continue;
                    }
                    if (!(payload instanceof Backlog)) continue;
                    Backlog backlog = (Backlog)payload;
                    statsBucket = this.getStatsBucket(backlog.getTimestampNanos(), backlog.getServiceNameOverride());
                    statsBucket.addBacklog(backlog);
                }
                catch (Exception e) {
                    log.debug("Error monitoring data streams", (Throwable)e);
                }
            }
        }
    }
}

