/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.common.writer.ddagent;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.common.exec.CommonTaskExecutor;
import datadog.common.exec.DaemonThreadFactory;
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.common.writer.ddagent.DisruptorEvent;
import datadog.trace.common.writer.ddagent.DisruptorUtils;
import datadog.trace.common.writer.ddagent.PayloadDispatcher;
import datadog.trace.core.DDSpan;
import datadog.trace.core.monitor.Monitor;
import datadog.trace.core.processor.TraceProcessor;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TraceProcessingDisruptor
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(TraceProcessingDisruptor.class);
    private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
    private final DisruptorEvent.DataTranslator<List<DDSpan>> dataTranslator;
    private final DisruptorEvent.FlushTranslator<List<DDSpan>> flushTranslator;
    private final DisruptorEvent.HeartbeatTranslator<List<DDSpan>> heartbeatTranslator = new DisruptorEvent.HeartbeatTranslator();
    private final boolean doHeartbeat;
    private volatile ScheduledFuture<?> heartbeat;

    public TraceProcessingDisruptor(int disruptorSize, Monitor monitor, DDAgentApi api, long flushInterval, TimeUnit timeUnit, boolean heartbeat) {
        this.disruptor = DisruptorUtils.create(new DisruptorEvent.Factory(), disruptorSize, DaemonThreadFactory.TRACE_PROCESSOR, ProducerType.MULTI, (WaitStrategy)new BlockingWaitStrategy());
        this.disruptor.handleEventsWith(new EventHandler[]{new TraceSerializingHandler(monitor, flushInterval, timeUnit, new PayloadDispatcher(api, monitor))});
        this.dataTranslator = new DisruptorEvent.DataTranslator();
        this.flushTranslator = new DisruptorEvent.FlushTranslator();
        this.doHeartbeat = heartbeat;
    }

    public void start() {
        if (this.doHeartbeat) {
            this.heartbeat = CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(new HeartbeatTask(), this, 1000L, 1000L, TimeUnit.MILLISECONDS, "disruptor heartbeat");
        }
        this.disruptor.start();
    }

    public boolean flush(long timeout, TimeUnit timeUnit) {
        CountDownLatch latch = new CountDownLatch(1);
        this.disruptor.publishEvent(this.flushTranslator, (Object)0, (Object)latch);
        try {
            return latch.await(timeout, timeUnit);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override
    public void close() {
        if (null != this.heartbeat) {
            this.heartbeat.cancel(true);
        }
        this.disruptor.halt();
    }

    public boolean publish(List<DDSpan> data, int representativeCount) {
        return this.disruptor.getRingBuffer().tryPublishEvent(this.dataTranslator, data, (Object)representativeCount);
    }

    void heartbeat() {
        this.disruptor.getRingBuffer().publishEvent(this.heartbeatTranslator);
    }

    public int getDisruptorCapacity() {
        return this.disruptor.getRingBuffer().getBufferSize();
    }

    public long getDisruptorRemainingCapacity() {
        return this.disruptor.getRingBuffer().remainingCapacity();
    }

    private static final class HeartbeatTask
    implements CommonTaskExecutor.Task<TraceProcessingDisruptor> {
        private HeartbeatTask() {
        }

        @Override
        public void run(TraceProcessingDisruptor traceProcessor) {
            traceProcessor.heartbeat();
        }
    }

    public static class TraceSerializingHandler
    implements EventHandler<DisruptorEvent<List<DDSpan>>> {
        private final TraceProcessor processor = new TraceProcessor();
        private final Monitor monitor;
        private final long flushIntervalMillis;
        private final boolean doTimeFlush;
        private final PayloadDispatcher payloadDispatcher;
        private long nextFlushMillis;

        public TraceSerializingHandler(Monitor monitor, long flushInterval, TimeUnit timeUnit, PayloadDispatcher payloadDispatcher) {
            this.monitor = monitor;
            this.doTimeFlush = flushInterval > 0L;
            this.payloadDispatcher = payloadDispatcher;
            if (this.doTimeFlush) {
                this.flushIntervalMillis = timeUnit.toMillis(flushInterval);
                this.scheduleNextTimeFlush();
            } else {
                this.flushIntervalMillis = Long.MAX_VALUE;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onEvent(DisruptorEvent<List<DDSpan>> event, long sequence, boolean endOfBatch) {
            try {
                if (event.data == null && this.doTimeFlush && this.millisecondTime() > this.nextFlushMillis) {
                    this.payloadDispatcher.flush();
                    this.scheduleNextTimeFlush();
                }
                if (event.data != null) {
                    this.payloadDispatcher.addTrace(this.processor.onTraceComplete((List)event.data));
                }
                if (null != event.flushLatch) {
                    this.payloadDispatcher.flush();
                    event.flushLatch.countDown();
                }
            }
            catch (Throwable e) {
                if (log.isDebugEnabled()) {
                    log.debug("Error while serializing trace", e);
                }
                this.monitor.onFailedSerialize((List)event.data, e);
            }
            finally {
                event.reset();
            }
        }

        private void scheduleNextTimeFlush() {
            if (this.doTimeFlush) {
                this.nextFlushMillis = this.millisecondTime() + this.flushIntervalMillis;
            }
        }

        private long millisecondTime() {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        }
    }
}

