/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.core.datastreams;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.WellKnownTags;
import datadog.trace.api.time.TimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.StatsPoint;
import datadog.trace.common.metrics.EventListener;
import datadog.trace.common.metrics.OkHttpSink;
import datadog.trace.common.metrics.Sink;
import datadog.trace.core.DDTraceCoreInfo;
import datadog.trace.core.datastreams.DataStreamsCheckpointer;
import datadog.trace.core.datastreams.DatastreamsPayloadWriter;
import datadog.trace.core.datastreams.DefaultPathwayContext;
import datadog.trace.core.datastreams.MsgPackDatastreamsPayloadWriter;
import datadog.trace.core.datastreams.StatsBucket;
import datadog.trace.util.AgentTaskScheduler;
import datadog.trace.util.AgentThreadFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultDataStreamsCheckpointer
implements DataStreamsCheckpointer,
AutoCloseable,
EventListener {
    private static final Logger log = LoggerFactory.getLogger(DefaultDataStreamsCheckpointer.class);
    static final long DEFAULT_BUCKET_DURATION_NANOS = TimeUnit.SECONDS.toNanos(10L);
    static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5L);
    private static final StatsPoint REPORT = new StatsPoint(Collections.emptyList(), 0L, 0L, 0L, 0L, 0L);
    private static final StatsPoint POISON_PILL = new StatsPoint(Collections.emptyList(), 0L, 0L, 0L, 0L, 0L);
    private final Map<Long, StatsBucket> timeToBucket = new HashMap<Long, StatsBucket>();
    private final BlockingQueue<StatsPoint> inbox = new MpscBlockingConsumerArrayQueue(1024);
    private final DatastreamsPayloadWriter payloadWriter;
    private final DDAgentFeaturesDiscovery features;
    private final TimeSource timeSource;
    private final WellKnownTags wellKnownTags;
    private final long bucketDurationNanos;
    private final Thread thread;
    private AgentTaskScheduler.Scheduled<DefaultDataStreamsCheckpointer> cancellation;
    private volatile long nextFeatureCheck;
    private volatile boolean supportsDataStreams = false;

    public DefaultDataStreamsCheckpointer(Config config, SharedCommunicationObjects sharedCommunicationObjects, TimeSource timeSource) {
        this(new OkHttpSink(sharedCommunicationObjects.okHttpClient, sharedCommunicationObjects.agentUrl.toString(), "v0.1/pipeline_stats", false, true, Collections.emptyMap()), sharedCommunicationObjects.featuresDiscovery, timeSource, config);
    }

    public DefaultDataStreamsCheckpointer(Sink sink, DDAgentFeaturesDiscovery features, TimeSource timeSource, Config config) {
        this(sink, features, timeSource, config.getWellKnownTags(), new MsgPackDatastreamsPayloadWriter(sink, config.getWellKnownTags(), DDTraceCoreInfo.VERSION, config.getPrimaryTag()), DEFAULT_BUCKET_DURATION_NANOS);
    }

    public DefaultDataStreamsCheckpointer(Sink sink, DDAgentFeaturesDiscovery features, TimeSource timeSource, WellKnownTags wellKnownTags, DatastreamsPayloadWriter payloadWriter, long bucketDurationNanos) {
        this.features = features;
        this.timeSource = timeSource;
        this.wellKnownTags = wellKnownTags;
        this.payloadWriter = payloadWriter;
        this.bucketDurationNanos = bucketDurationNanos;
        this.thread = AgentThreadFactory.newAgentThread(AgentThreadFactory.AgentThread.DATA_STREAMS_MONITORING, new InboxProcessor());
        sink.register(this);
    }

    @Override
    public void start() {
        if (this.features.getDataStreamsEndpoint() == null) {
            this.features.discover();
        }
        if (this.features.supportsDataStreams()) {
            this.supportsDataStreams = true;
        } else {
            this.supportsDataStreams = false;
            log.debug("Data streams is disabled or not supported by agent");
        }
        this.nextFeatureCheck = this.timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS;
        this.cancellation = AgentTaskScheduler.INSTANCE.scheduleAtFixedRate(new ReportTask(), this, this.bucketDurationNanos, this.bucketDurationNanos, TimeUnit.NANOSECONDS);
        this.thread.start();
    }

    @Override
    public void accept(StatsPoint statsPoint) {
        if (this.thread.isAlive()) {
            this.inbox.offer(statsPoint);
        }
    }

    @Override
    public PathwayContext newPathwayContext() {
        return new DefaultPathwayContext(this.timeSource, this.wellKnownTags);
    }

    @Override
    public <C> PathwayContext extractPathwayContext(C carrier, AgentPropagation.BinaryContextVisitor<C> getter) {
        return DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.wellKnownTags);
    }

    @Override
    public void close() {
        if (null != this.cancellation) {
            this.cancellation.cancel();
        }
        this.inbox.offer(POISON_PILL);
        try {
            this.thread.join(800L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.inbox.clear();
    }

    private long currentBucket(long timestampNanos) {
        return timestampNanos - timestampNanos % this.bucketDurationNanos;
    }

    private void flush(long timestampNanos) {
        long currentBucket = this.currentBucket(timestampNanos);
        ArrayList<StatsBucket> includedBuckets = new ArrayList<StatsBucket>();
        Iterator<Map.Entry<Long, StatsBucket>> mapIterator = this.timeToBucket.entrySet().iterator();
        while (mapIterator.hasNext()) {
            Map.Entry<Long, StatsBucket> entry = mapIterator.next();
            if (entry.getKey() >= currentBucket) continue;
            mapIterator.remove();
            includedBuckets.add(entry.getValue());
        }
        if (!includedBuckets.isEmpty()) {
            log.debug("Flushing {} buckets", (Object)includedBuckets.size());
            this.payloadWriter.writePayload(includedBuckets);
        }
    }

    void report() {
        this.inbox.offer(REPORT);
    }

    @Override
    public void onEvent(EventListener.EventType eventType, String message) {
        switch (eventType) {
            case DOWNGRADED: {
                log.debug("Agent downgrade was detected");
                this.checkFeatures();
                break;
            }
            case BAD_PAYLOAD: {
                log.debug("bad metrics payload sent to trace agent: {}", (Object)message);
                break;
            }
            case ERROR: {
                log.debug("trace agent errored receiving metrics payload: {}", (Object)message);
                break;
            }
        }
    }

    private void checkFeatures() {
        boolean oldValue = this.supportsDataStreams;
        this.features.discover();
        this.supportsDataStreams = this.features.supportsDataStreams();
        if (oldValue && !this.supportsDataStreams) {
            log.info("Disabling data streams reporting because it is not supported by the agent");
        } else if (!oldValue && this.supportsDataStreams) {
            log.info("Agent upgrade detected. Enabling data streams because it is now supported");
        }
        this.nextFeatureCheck = this.timeSource.getCurrentTimeNanos() + FEATURE_CHECK_INTERVAL_NANOS;
    }

    private static final class ReportTask
    implements AgentTaskScheduler.Task<DefaultDataStreamsCheckpointer> {
        private ReportTask() {
        }

        @Override
        public void run(DefaultDataStreamsCheckpointer target) {
            target.report();
        }
    }

    private class InboxProcessor
    implements Runnable {
        private InboxProcessor() {
        }

        @Override
        public void run() {
            Thread currentThread = Thread.currentThread();
            while (!currentThread.isInterrupted()) {
                try {
                    StatsPoint statsPoint = (StatsPoint)DefaultDataStreamsCheckpointer.this.inbox.take();
                    if (statsPoint == REPORT) {
                        if (DefaultDataStreamsCheckpointer.this.supportsDataStreams) {
                            DefaultDataStreamsCheckpointer.this.flush(DefaultDataStreamsCheckpointer.this.timeSource.getCurrentTimeNanos());
                            continue;
                        }
                        if (DefaultDataStreamsCheckpointer.this.timeSource.getCurrentTimeNanos() < DefaultDataStreamsCheckpointer.this.nextFeatureCheck) continue;
                        DefaultDataStreamsCheckpointer.this.checkFeatures();
                        continue;
                    }
                    if (statsPoint == POISON_PILL) {
                        if (!DefaultDataStreamsCheckpointer.this.supportsDataStreams) break;
                        DefaultDataStreamsCheckpointer.this.flush(Long.MAX_VALUE);
                        break;
                    }
                    if (!DefaultDataStreamsCheckpointer.this.supportsDataStreams) continue;
                    Long bucket = DefaultDataStreamsCheckpointer.this.currentBucket(statsPoint.getTimestampNanos());
                    StatsBucket statsBucket = (StatsBucket)DefaultDataStreamsCheckpointer.this.timeToBucket.get(bucket);
                    if (statsBucket == null) {
                        statsBucket = new StatsBucket(bucket, DefaultDataStreamsCheckpointer.this.bucketDurationNanos);
                        DefaultDataStreamsCheckpointer.this.timeToBucket.put(bucket, statsBucket);
                    }
                    statsBucket.addPoint(statsPoint);
                }
                catch (InterruptedException e) {
                    currentThread.interrupt();
                }
                catch (Exception e) {
                    log.debug("Error monitoring data streams", (Throwable)e);
                }
            }
        }
    }
}

