/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.common.metrics;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.Functions;
import datadog.trace.api.WellKnownTags;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.common.metrics.Aggregator;
import datadog.trace.common.metrics.Batch;
import datadog.trace.common.metrics.EventListener;
import datadog.trace.common.metrics.MetricKey;
import datadog.trace.common.metrics.MetricWriter;
import datadog.trace.common.metrics.MetricsAggregator;
import datadog.trace.common.metrics.OkHttpSink;
import datadog.trace.common.metrics.SerializingMetricWriter;
import datadog.trace.common.metrics.Sink;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDTraceCoreInfo;
import datadog.trace.util.AgentTaskScheduler;
import datadog.trace.util.AgentThreadFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.jctools.maps.NonBlockingHashMap;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.jctools.queues.SpmcArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConflatingMetricsAggregator
implements MetricsAggregator,
EventListener {
    private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class);
    private static final Map<String, String> DEFAULT_HEADERS = Collections.singletonMap("Datadog-Meta-Tracer-Version", DDTraceCoreInfo.VERSION);
    private static final DDCache<String, UTF8BytesString> SERVICE_NAMES = DDCaches.newFixedSizeCache(32);
    static final Batch POISON_PILL = Batch.NULL;
    private final Set<String> ignoredResources;
    private final Queue<Batch> batchPool;
    private final NonBlockingHashMap<MetricKey, Batch> pending;
    private final NonBlockingHashMap<MetricKey, MetricKey> keys;
    private final Thread thread;
    private final BlockingQueue<Batch> inbox;
    private final Sink sink;
    private final Aggregator aggregator;
    private final long reportingInterval;
    private final TimeUnit reportingIntervalTimeUnit;
    private final DDAgentFeaturesDiscovery features;
    private volatile AgentTaskScheduler.Scheduled<?> cancellation;

    public ConflatingMetricsAggregator(Config config, SharedCommunicationObjects sharedCommunicationObjects) {
        this(config.getWellKnownTags(), config.getMetricsIgnoredResources(), sharedCommunicationObjects.featuresDiscovery, new OkHttpSink(sharedCommunicationObjects.okHttpClient, config.getAgentUrl(), "v0.6/stats", config.isTracerMetricsBufferingEnabled(), false, DEFAULT_HEADERS), config.getTracerMetricsMaxAggregates(), config.getTracerMetricsMaxPending());
    }

    ConflatingMetricsAggregator(WellKnownTags wellKnownTags, Set<String> ignoredResources, DDAgentFeaturesDiscovery features, Sink sink, int maxAggregates, int queueSize) {
        this(wellKnownTags, ignoredResources, features, sink, maxAggregates, queueSize, 10L, TimeUnit.SECONDS);
    }

    ConflatingMetricsAggregator(WellKnownTags wellKnownTags, Set<String> ignoredResources, DDAgentFeaturesDiscovery features, Sink sink, int maxAggregates, int queueSize, long reportingInterval, TimeUnit timeUnit) {
        this(ignoredResources, features, sink, new SerializingMetricWriter(wellKnownTags, sink), maxAggregates, queueSize, reportingInterval, timeUnit);
    }

    ConflatingMetricsAggregator(Set<String> ignoredResources, DDAgentFeaturesDiscovery features, Sink sink, MetricWriter metricWriter, int maxAggregates, int queueSize, long reportingInterval, TimeUnit timeUnit) {
        this.ignoredResources = ignoredResources;
        this.inbox = new MpscBlockingConsumerArrayQueue(queueSize);
        this.batchPool = new SpmcArrayQueue(maxAggregates);
        this.pending = new NonBlockingHashMap(maxAggregates * 4 / 3);
        this.keys = new NonBlockingHashMap();
        this.features = features;
        this.sink = sink;
        this.aggregator = new Aggregator(metricWriter, this.batchPool, this.inbox, this.pending, this.keys.keySet(), maxAggregates, reportingInterval, timeUnit);
        this.thread = AgentThreadFactory.newAgentThread(AgentThreadFactory.AgentThread.METRICS_AGGREGATOR, this.aggregator);
        this.reportingInterval = reportingInterval;
        this.reportingIntervalTimeUnit = timeUnit;
    }

    @Override
    public void start() {
        if (this.features.getMetricsEndpoint() == null) {
            this.features.discover();
        }
        if (this.features.supportsMetrics()) {
            this.sink.register(this);
            this.thread.start();
            this.cancellation = AgentTaskScheduler.INSTANCE.scheduleAtFixedRate(new ReportTask(), this, this.reportingInterval, this.reportingInterval, this.reportingIntervalTimeUnit);
            log.debug("started metrics aggregator");
        } else {
            log.debug("metrics not supported by trace agent");
        }
    }

    @Override
    public boolean report() {
        boolean published;
        int attempts = 0;
        while (!(published = this.inbox.offer(Batch.REPORT)) && ++attempts < 10) {
        }
        if (!published) {
            log.debug("Skipped metrics reporting because the queue is full");
        }
        return published;
    }

    @Override
    public boolean publish(List<? extends CoreSpan<?>> trace) {
        boolean forceKeep = false;
        if (this.features.supportsMetrics()) {
            for (CoreSpan<?> span : trace) {
                boolean isTopLevel = span.isTopLevel();
                if (!isTopLevel && !span.isMeasured()) continue;
                if (this.ignoredResources.contains(span.getResourceName().toString())) {
                    return false;
                }
                forceKeep |= this.publish(span, isTopLevel);
            }
        }
        return forceKeep;
    }

    private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
        MetricKey newKey = new MetricKey(span.getResourceName(), SERVICE_NAMES.computeIfAbsent(span.getServiceName(), Functions.UTF8_ENCODE), span.getOperationName(), span.getType(), span.getHttpStatusCode());
        boolean isNewKey = false;
        MetricKey key = (MetricKey)this.keys.putIfAbsent((Object)newKey, (Object)newKey);
        if (null == key) {
            key = newKey;
            isNewKey = true;
        }
        long tag = (span.getError() > 0 ? Long.MIN_VALUE : 0L) | (isTopLevel ? 0x4000000000000000L : 0L);
        long durationNanos = span.getDurationNano();
        Batch batch = (Batch)this.pending.get((Object)key);
        if (null != batch) {
            if (batch.add(tag, durationNanos)) {
                return false;
            }
            key = batch.getKey();
            isNewKey = false;
        }
        batch = this.newBatch(key);
        batch.add(tag, durationNanos);
        this.pending.put((Object)key, (Object)batch);
        this.inbox.offer(batch);
        return isNewKey || span.getError() > 0;
    }

    private Batch newBatch(MetricKey key) {
        Batch batch = this.batchPool.poll();
        if (null == batch) {
            return new Batch(key);
        }
        return batch.reset(key);
    }

    public void stop() {
        if (null != this.cancellation) {
            this.cancellation.cancel();
        }
        this.inbox.offer(POISON_PILL);
    }

    @Override
    public void close() {
        this.stop();
        try {
            this.thread.join(800L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    public void onEvent(EventListener.EventType eventType, String message) {
        switch (eventType) {
            case DOWNGRADED: {
                log.debug("Agent downgrade was detected");
                this.disable();
                break;
            }
            case BAD_PAYLOAD: {
                log.debug("bad metrics payload sent to trace agent: {}", (Object)message);
                break;
            }
            case ERROR: {
                log.debug("trace agent errored receiving metrics payload: {}", (Object)message);
                break;
            }
        }
    }

    private void disable() {
        this.features.discover();
        if (!this.features.supportsMetrics()) {
            log.debug("Disabling metric reporting because an agent downgrade was detected");
            AgentTaskScheduler.Scheduled<?> cancellation = this.cancellation;
            if (null != cancellation) {
                cancellation.cancel();
            }
            this.thread.interrupt();
            this.pending.clear();
            this.batchPool.clear();
            this.inbox.clear();
            this.aggregator.clearAggregates();
        }
    }

    private static final class ReportTask
    implements AgentTaskScheduler.Task<ConflatingMetricsAggregator> {
        private ReportTask() {
        }

        @Override
        public void run(ConflatingMetricsAggregator target) {
            target.report();
        }
    }
}

