/*
 * Decompiled with CFR 0.152.
 */
package ddtrot.dd.trace.common.writer;

import datadog.trace.api.Config;
import ddtrot.dd.communication.ddagent.DroppingPolicy;
import ddtrot.dd.trace.common.sampling.SingleSpanSampler;
import ddtrot.dd.trace.common.writer.PayloadDispatcher;
import ddtrot.dd.trace.common.writer.SpanSamplingWorker;
import ddtrot.dd.trace.common.writer.ddagent.FlushEvent;
import ddtrot.dd.trace.common.writer.ddagent.Prioritization;
import ddtrot.dd.trace.common.writer.ddagent.PrioritizationStrategy;
import ddtrot.dd.trace.core.CoreSpan;
import ddtrot.dd.trace.core.DDSpan;
import ddtrot.dd.trace.core.DDSpanContext;
import ddtrot.dd.trace.core.monitor.HealthMetrics;
import ddtrot.dd.trace.core.postprocessor.SpanPostProcessor;
import ddtrot.dd.trace.util.AgentThreadFactory;
import ddtrot.org.jctools.queues.MessagePassingQueue;
import ddtrot.org.jctools.queues.MpscBlockingConsumerArrayQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TraceProcessingWorker
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(TraceProcessingWorker.class);
    private final PrioritizationStrategy prioritizationStrategy;
    private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
    private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
    private final TraceSerializingHandler serializingHandler;
    private final Thread serializerThread;
    private final int capacity;
    private final SpanSamplingWorker spanSamplingWorker;

    public TraceProcessingWorker(int capacity, HealthMetrics healthMetrics, PayloadDispatcher dispatcher, DroppingPolicy droppingPolicy, Prioritization prioritization, long flushInterval, TimeUnit timeUnit, SingleSpanSampler singleSpanSampler, SpanPostProcessor spanPostProcessor) {
        this.capacity = capacity;
        this.primaryQueue = TraceProcessingWorker.createQueue(capacity);
        this.secondaryQueue = TraceProcessingWorker.createQueue(capacity);
        this.spanSamplingWorker = SpanSamplingWorker.build(capacity, this.primaryQueue, this.secondaryQueue, singleSpanSampler, healthMetrics, droppingPolicy);
        this.prioritizationStrategy = prioritization.create(this.primaryQueue, this.secondaryQueue, this.spanSamplingWorker.getSpanSamplingQueue(), droppingPolicy);
        this.serializingHandler = new TraceSerializingHandler(this.primaryQueue, this.secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit, spanPostProcessor);
        this.serializerThread = AgentThreadFactory.newAgentThread(AgentThreadFactory.AgentThread.TRACE_PROCESSOR, this.serializingHandler);
    }

    public void start() {
        this.serializerThread.start();
        this.spanSamplingWorker.start();
    }

    public boolean flush(long timeout, TimeUnit timeUnit) {
        boolean offered;
        CountDownLatch latch = new CountDownLatch(1);
        FlushEvent flush = new FlushEvent(latch);
        while (!(offered = this.primaryQueue.offer(flush)) && this.serializerThread.isAlive()) {
        }
        try {
            return latch.await(timeout, timeUnit);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override
    public void close() {
        this.spanSamplingWorker.close();
        this.serializerThread.interrupt();
        try {
            this.serializerThread.join(800L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public <T extends CoreSpan<T>> PrioritizationStrategy.PublishResult publish(T root, int samplingPriority, List<T> trace) {
        return this.prioritizationStrategy.publish(root, samplingPriority, trace);
    }

    public int getCapacity() {
        return this.capacity;
    }

    public long getRemainingCapacity() {
        return this.primaryQueue.remainingCapacity();
    }

    private static MpscBlockingConsumerArrayQueue<Object> createQueue(int capacity) {
        return new MpscBlockingConsumerArrayQueue<Object>(capacity);
    }

    public static class TraceSerializingHandler
    implements Runnable {
        private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
        private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
        private final HealthMetrics healthMetrics;
        private final long ticksRequiredToFlush;
        private final boolean doTimeFlush;
        private final PayloadDispatcher payloadDispatcher;
        private long lastTicks;
        private final SpanPostProcessor spanPostProcessor;

        public TraceSerializingHandler(MpscBlockingConsumerArrayQueue<Object> primaryQueue, MpscBlockingConsumerArrayQueue<Object> secondaryQueue, HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, long flushInterval, TimeUnit timeUnit, SpanPostProcessor spanPostProcessor) {
            this.primaryQueue = primaryQueue;
            this.secondaryQueue = secondaryQueue;
            this.healthMetrics = healthMetrics;
            this.doTimeFlush = flushInterval > 0L;
            this.payloadDispatcher = payloadDispatcher;
            if (this.doTimeFlush) {
                this.lastTicks = System.nanoTime();
                this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval);
            } else {
                this.ticksRequiredToFlush = Long.MAX_VALUE;
            }
            this.spanPostProcessor = spanPostProcessor;
        }

        @Override
        public void run() {
            try {
                this.runDutyCycle();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            log.debug("Datadog trace processor exited. Publishing traces stopped. Unpublished traces left: " + !this.queuesAreEmpty());
        }

        private void runDutyCycle() throws InterruptedException {
            Thread thread = Thread.currentThread();
            while (!thread.isInterrupted()) {
                this.consumeFromPrimaryQueue();
                this.consumeFromSecondaryQueue();
                this.flushIfNecessary();
            }
        }

        public void onEvent(Object event) {
            try {
                if (event instanceof List) {
                    List trace = (List)event;
                    this.maybeTracePostProcessing(trace);
                    this.payloadDispatcher.addTrace(trace);
                } else if (event instanceof FlushEvent) {
                    this.payloadDispatcher.flush();
                    ((FlushEvent)event).sync();
                }
            }
            catch (Throwable e) {
                if (log.isDebugEnabled()) {
                    log.debug("Error while serializing trace", e);
                }
                List data = event instanceof List ? (List)event : null;
                this.healthMetrics.onFailedSerialize(data, e);
            }
        }

        protected void consumeFromPrimaryQueue() throws InterruptedException {
            Object event = this.primaryQueue.poll(100L, TimeUnit.MILLISECONDS);
            if (null != event) {
                this.onEvent(event);
                this.consumeBatch(this.primaryQueue);
            }
        }

        protected void consumeFromSecondaryQueue() {
            Object event = this.secondaryQueue.poll();
            if (null != event) {
                this.onEvent(event);
                this.consumeBatch(this.secondaryQueue);
            }
        }

        protected void flushIfNecessary() {
            if (this.shouldFlush()) {
                this.payloadDispatcher.flush();
            }
        }

        private boolean shouldFlush() {
            long nanoTime;
            long ticks;
            if (this.doTimeFlush && (ticks = (nanoTime = System.nanoTime()) - this.lastTicks) > this.ticksRequiredToFlush) {
                this.lastTicks = nanoTime;
                return true;
            }
            return false;
        }

        private void consumeBatch(MessagePassingQueue<Object> queue) {
            queue.drain(this::onEvent, queue.size());
        }

        protected boolean queuesAreEmpty() {
            return this.primaryQueue.isEmpty() && this.secondaryQueue.isEmpty();
        }

        private void maybeTracePostProcessing(List<DDSpan> trace) {
            block7: {
                if (trace == null || trace.isEmpty()) {
                    return;
                }
                ArrayList<DDSpan> spansToPostProcess = null;
                for (DDSpan span : trace) {
                    DDSpanContext context = span.context();
                    if (context == null || !context.isRequiresPostProcessing()) continue;
                    if (spansToPostProcess == null) {
                        spansToPostProcess = new ArrayList<DDSpan>();
                    }
                    spansToPostProcess.add(span);
                }
                if (spansToPostProcess == null) {
                    return;
                }
                try {
                    long timeout = Config.get().getTracePostProcessingTimeout();
                    long deadline = System.currentTimeMillis() + timeout;
                    BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline;
                    for (DDSpan span : spansToPostProcess) {
                        if (this.spanPostProcessor.process(span, timeoutCheck)) continue;
                        log.debug("Span post-processing interrupted due to timeout.");
                        break;
                    }
                }
                catch (Throwable e) {
                    if (!log.isDebugEnabled()) break block7;
                    log.debug("Error while trace post-processing", e);
                }
            }
        }
    }
}

