/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.common.writer.ddagent;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.common.exec.CommonTaskExecutor;
import datadog.common.exec.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.ddagent.DispatchingDisruptor;
import datadog.trace.common.writer.ddagent.DisruptorEvent;
import datadog.trace.common.writer.ddagent.DisruptorUtils;
import datadog.trace.common.writer.ddagent.Monitor;
import datadog.trace.common.writer.ddagent.StatefulSerializer;
import datadog.trace.common.writer.ddagent.TraceBuffer;
import datadog.trace.core.DDSpan;
import datadog.trace.core.processor.TraceProcessor;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TraceProcessingDisruptor
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(TraceProcessingDisruptor.class);
    private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
    private final DisruptorEvent.DataTranslator<List<DDSpan>> dataTranslator;
    private final DisruptorEvent.FlushTranslator<List<DDSpan>> flushTranslator;
    private final DisruptorEvent.HeartbeatTranslator<List<DDSpan>> heartbeatTranslator = new DisruptorEvent.HeartbeatTranslator();
    private final boolean doHeartbeat;
    private volatile ScheduledFuture<?> heartbeat;

    public TraceProcessingDisruptor(int disruptorSize, DispatchingDisruptor dispatchingDisruptor, Monitor monitor, DDAgentWriter writer, StatefulSerializer serializer, long flushInterval, TimeUnit timeUnit, boolean heartbeat) {
        this.disruptor = DisruptorUtils.create(new DisruptorEvent.Factory(), disruptorSize, DaemonThreadFactory.TRACE_PROCESSOR, ProducerType.MULTI, (WaitStrategy)new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(10L)));
        this.disruptor.handleEventsWith(new EventHandler[]{new TraceSerializingHandler(dispatchingDisruptor, monitor, writer, serializer, flushInterval, timeUnit)});
        this.dataTranslator = new DisruptorEvent.DataTranslator();
        this.flushTranslator = new DisruptorEvent.FlushTranslator();
        this.doHeartbeat = heartbeat;
    }

    public void start() {
        if (this.doHeartbeat) {
            this.heartbeat = CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(new HeartbeatTask(), this, 100L, 100L, TimeUnit.MILLISECONDS, "disruptor heartbeat");
        }
        this.disruptor.start();
    }

    public boolean flush(long timeout, TimeUnit timeUnit) {
        CountDownLatch latch = new CountDownLatch(1);
        this.disruptor.publishEvent(this.flushTranslator, (Object)0, (Object)latch);
        try {
            return latch.await(timeout, timeUnit);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override
    public void close() {
        if (null != this.heartbeat) {
            this.heartbeat.cancel(true);
        }
        this.disruptor.halt();
    }

    public boolean publish(List<DDSpan> data, int representativeCount) {
        return this.disruptor.getRingBuffer().tryPublishEvent(this.dataTranslator, data, (Object)representativeCount);
    }

    void heartbeat() {
        this.disruptor.getRingBuffer().tryPublishEvent(this.heartbeatTranslator);
    }

    public int getDisruptorCapacity() {
        return this.disruptor.getRingBuffer().getBufferSize();
    }

    public long getDisruptorRemainingCapacity() {
        return this.disruptor.getRingBuffer().remainingCapacity();
    }

    private static final class HeartbeatTask
    implements CommonTaskExecutor.Task<TraceProcessingDisruptor> {
        private HeartbeatTask() {
        }

        @Override
        public void run(TraceProcessingDisruptor traceProcessor) {
            traceProcessor.heartbeat();
        }
    }

    public static class TraceSerializingHandler
    implements EventHandler<DisruptorEvent<List<DDSpan>>> {
        private final TraceProcessor processor = new TraceProcessor();
        private final DispatchingDisruptor dispatchingDisruptor;
        private final Monitor monitor;
        private final DDAgentWriter writer;
        private final StatefulSerializer serializer;
        private final long flushIntervalMillis;
        private final boolean doTimeFlush;
        private long publicationTxn = -1L;
        private int representativeCount = 0;
        private long nextFlushMillis;

        public TraceSerializingHandler(DispatchingDisruptor dispatchingDisruptor, Monitor monitor, DDAgentWriter writer, StatefulSerializer serializer, long flushInterval, TimeUnit timeUnit) {
            this.dispatchingDisruptor = dispatchingDisruptor;
            this.monitor = monitor;
            this.writer = writer;
            this.serializer = serializer;
            boolean bl = this.doTimeFlush = flushInterval > 0L;
            if (this.doTimeFlush) {
                this.flushIntervalMillis = timeUnit.toMillis(flushInterval);
                this.scheduleNextTimeFlush();
            } else {
                this.flushIntervalMillis = Long.MAX_VALUE;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onEvent(DisruptorEvent<List<DDSpan>> event, long sequence, boolean endOfBatch) {
            if (-1L == this.publicationTxn) {
                this.beginTransaction();
            }
            try {
                if ((this.representativeCount > 0 || event.flushLatch != null) && (this.serializer.isAtCapacity() || this.doTimeFlush && this.millisecondTime() > this.nextFlushMillis || event.flushLatch != null)) {
                    this.commitTransaction(event.flushLatch);
                }
                if (event.data != null) {
                    this.serialize((List)event.data, event.representativeCount);
                }
            }
            catch (Throwable e) {
                if (log.isDebugEnabled()) {
                    log.debug("Error while serializing trace", e);
                }
                this.monitor.onFailedSerialize(this.writer, (List)event.data, e);
            }
            finally {
                event.reset();
            }
        }

        private void serialize(List<DDSpan> trace, int representativeCount) throws IOException {
            this.representativeCount += representativeCount;
            trace = this.processor.onTraceComplete(trace);
            int sizeInBytes = this.serializer.serialize(trace);
            this.monitor.onSerialize(this.writer, trace, sizeInBytes);
        }

        private void commitTransaction(final CountDownLatch flushLatch) throws IOException {
            this.serializer.dropBuffer();
            TraceBuffer buffer = this.dispatchingDisruptor.getTraceBuffer(this.publicationTxn);
            if (null != flushLatch) {
                buffer.setDispatchRunnable(new Runnable(){

                    @Override
                    public void run() {
                        flushLatch.countDown();
                    }
                });
            }
            buffer.setRepresentativeCount(this.representativeCount);
            if (log.isDebugEnabled()) {
                log.debug("publish id={}, rc={}, tc={}", new Object[]{buffer.id(), buffer.representativeCount(), buffer.traceCount()});
            }
            this.dispatchingDisruptor.commit(this.publicationTxn);
            this.beginTransaction();
        }

        private void beginTransaction() {
            this.publicationTxn = this.dispatchingDisruptor.beginTransaction();
            this.representativeCount = 0;
            this.serializer.reset(this.dispatchingDisruptor.getTraceBuffer(this.publicationTxn));
            this.scheduleNextTimeFlush();
        }

        private void scheduleNextTimeFlush() {
            if (this.doTimeFlush) {
                this.nextFlushMillis = this.millisecondTime() + this.flushIntervalMillis;
            }
        }

        private long millisecondTime() {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        }
    }
}

