/*
 * Decompiled with CFR 0.152.
 */
package ddtrot.dd.trace.common.writer;

import ddtrot.dd.communication.ddagent.DroppingPolicy;
import ddtrot.dd.trace.common.writer.PayloadDispatcher;
import ddtrot.dd.trace.common.writer.ddagent.FlushEvent;
import ddtrot.dd.trace.common.writer.ddagent.Prioritization;
import ddtrot.dd.trace.common.writer.ddagent.PrioritizationStrategy;
import ddtrot.dd.trace.core.CoreSpan;
import ddtrot.dd.trace.core.monitor.HealthMetrics;
import ddtrot.dd.trace.util.AgentThreadFactory;
import ddtrot.org.jctools.queues.MessagePassingQueue;
import ddtrot.org.jctools.queues.MpscBlockingConsumerArrayQueue;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TraceProcessingWorker
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(TraceProcessingWorker.class);
    private final PrioritizationStrategy prioritizationStrategy;
    private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
    private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
    private final TraceSerializingHandler serializingHandler;
    private final Thread serializerThread;
    private final int capacity;

    public TraceProcessingWorker(int capacity, HealthMetrics healthMetrics, PayloadDispatcher dispatcher, DroppingPolicy droppingPolicy, Prioritization prioritization, long flushInterval, TimeUnit timeUnit) {
        this.capacity = capacity;
        this.primaryQueue = TraceProcessingWorker.createQueue(capacity);
        this.secondaryQueue = TraceProcessingWorker.createQueue(capacity);
        this.prioritizationStrategy = prioritization.create(this.primaryQueue, this.secondaryQueue, droppingPolicy);
        this.serializingHandler = new TraceSerializingHandler(this.primaryQueue, this.secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit);
        this.serializerThread = AgentThreadFactory.newAgentThread(AgentThreadFactory.AgentThread.TRACE_PROCESSOR, this.serializingHandler);
    }

    public void start() {
        this.serializerThread.start();
    }

    public boolean flush(long timeout, TimeUnit timeUnit) {
        boolean offered;
        CountDownLatch latch = new CountDownLatch(1);
        FlushEvent flush = new FlushEvent(latch);
        while (!(offered = this.primaryQueue.offer(flush)) && this.serializerThread.isAlive()) {
        }
        try {
            return latch.await(timeout, timeUnit);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override
    public void close() {
        this.serializerThread.interrupt();
        try {
            this.serializerThread.join(800L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public <T extends CoreSpan<T>> boolean publish(T root, int samplingPriority, List<T> trace) {
        return this.prioritizationStrategy.publish(root, samplingPriority, trace);
    }

    public int getCapacity() {
        return this.capacity;
    }

    public long getRemainingCapacity() {
        return this.primaryQueue.remainingCapacity();
    }

    private static MpscBlockingConsumerArrayQueue<Object> createQueue(int capacity) {
        return new MpscBlockingConsumerArrayQueue<Object>(capacity);
    }

    public static class TraceSerializingHandler
    implements Runnable,
    MessagePassingQueue.Consumer<Object> {
        private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
        private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
        private final HealthMetrics healthMetrics;
        private final long ticksRequiredToFlush;
        private final boolean doTimeFlush;
        private final PayloadDispatcher payloadDispatcher;
        private long lastTicks;

        public TraceSerializingHandler(MpscBlockingConsumerArrayQueue<Object> primaryQueue, MpscBlockingConsumerArrayQueue<Object> secondaryQueue, HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, long flushInterval, TimeUnit timeUnit) {
            this.primaryQueue = primaryQueue;
            this.secondaryQueue = secondaryQueue;
            this.healthMetrics = healthMetrics;
            this.doTimeFlush = flushInterval > 0L;
            this.payloadDispatcher = payloadDispatcher;
            if (this.doTimeFlush) {
                this.lastTicks = System.nanoTime();
                this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval);
            } else {
                this.ticksRequiredToFlush = Long.MAX_VALUE;
            }
        }

        public void onEvent(Object event) {
            try {
                if (event instanceof List) {
                    List trace = (List)event;
                    this.payloadDispatcher.addTrace(trace);
                } else if (event instanceof FlushEvent) {
                    this.payloadDispatcher.flush();
                    ((FlushEvent)event).sync();
                }
            }
            catch (Throwable e) {
                if (log.isDebugEnabled()) {
                    log.debug("Error while serializing trace", e);
                }
                List data = event instanceof List ? (List)event : null;
                this.healthMetrics.onFailedSerialize(data, e);
            }
        }

        @Override
        public void run() {
            try {
                this.runDutyCycle();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            log.debug("Datadog trace processor exited. Publishing traces stopped");
        }

        private void runDutyCycle() throws InterruptedException {
            Thread thread = Thread.currentThread();
            while (!thread.isInterrupted()) {
                this.consumeFromPrimaryQueue();
                this.consumeFromSecondaryQueue();
                this.flushIfNecessary();
            }
        }

        private void consumeFromPrimaryQueue() throws InterruptedException {
            Object event = this.primaryQueue.poll(100L, TimeUnit.MILLISECONDS);
            if (null != event) {
                this.onEvent(event);
                this.consumeBatch(this.primaryQueue);
            }
        }

        private void consumeFromSecondaryQueue() {
            Object event = this.secondaryQueue.poll();
            if (null != event) {
                this.onEvent(event);
                this.consumeBatch(this.secondaryQueue);
            }
        }

        private void flushIfNecessary() {
            if (this.shouldFlush()) {
                this.payloadDispatcher.flush();
            }
        }

        private boolean shouldFlush() {
            long nanoTime;
            long ticks;
            if (this.doTimeFlush && (ticks = (nanoTime = System.nanoTime()) - this.lastTicks) > this.ticksRequiredToFlush) {
                this.lastTicks = nanoTime;
                return true;
            }
            return false;
        }

        private void consumeBatch(MessagePassingQueue<Object> queue) {
            queue.drain(this, queue.size());
        }

        @Override
        public void accept(Object event) {
            this.onEvent(event);
        }
    }
}

