/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.common.writer.ddagent;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.common.exec.CommonTaskExecutor;
import datadog.common.exec.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.common.writer.ddagent.DisruptorEvent;
import datadog.trace.common.writer.ddagent.DisruptorUtils;
import datadog.trace.common.writer.ddagent.Monitor;
import datadog.trace.common.writer.ddagent.TraceMapper;
import datadog.trace.core.DDSpan;
import datadog.trace.core.processor.TraceProcessor;
import datadog.trace.core.serialization.msgpack.ByteBufferConsumer;
import datadog.trace.core.serialization.msgpack.Packer;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TraceProcessingDisruptor
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(TraceProcessingDisruptor.class);
    static final int DEFAULT_BUFFER_SIZE = 0x200000;
    private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
    private final DisruptorEvent.DataTranslator<List<DDSpan>> dataTranslator;
    private final DisruptorEvent.FlushTranslator<List<DDSpan>> flushTranslator;
    private final DisruptorEvent.HeartbeatTranslator<List<DDSpan>> heartbeatTranslator = new DisruptorEvent.HeartbeatTranslator();
    private final boolean doHeartbeat;
    private volatile ScheduledFuture<?> heartbeat;

    public TraceProcessingDisruptor(int disruptorSize, Monitor monitor, DDAgentWriter writer, DDAgentApi api, long flushInterval, TimeUnit timeUnit, boolean heartbeat) {
        this.disruptor = DisruptorUtils.create(new DisruptorEvent.Factory(), disruptorSize, DaemonThreadFactory.TRACE_PROCESSOR, ProducerType.MULTI, (WaitStrategy)new BlockingWaitStrategy());
        this.disruptor.handleEventsWith(new EventHandler[]{new TraceSerializingHandler(monitor, writer, flushInterval, timeUnit, api)});
        this.dataTranslator = new DisruptorEvent.DataTranslator();
        this.flushTranslator = new DisruptorEvent.FlushTranslator();
        this.doHeartbeat = heartbeat;
    }

    public void start() {
        if (this.doHeartbeat) {
            this.heartbeat = CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(new HeartbeatTask(), this, 100L, 100L, TimeUnit.MILLISECONDS, "disruptor heartbeat");
        }
        this.disruptor.start();
    }

    public boolean flush(long timeout, TimeUnit timeUnit) {
        CountDownLatch latch = new CountDownLatch(1);
        this.disruptor.publishEvent(this.flushTranslator, (Object)0, (Object)latch);
        try {
            return latch.await(timeout, timeUnit);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override
    public void close() {
        if (null != this.heartbeat) {
            this.heartbeat.cancel(true);
        }
        this.disruptor.halt();
    }

    public boolean publish(List<DDSpan> data, int representativeCount) {
        return this.disruptor.getRingBuffer().tryPublishEvent(this.dataTranslator, data, (Object)representativeCount);
    }

    void heartbeat() {
        this.disruptor.getRingBuffer().tryPublishEvent(this.heartbeatTranslator);
    }

    public int getDisruptorCapacity() {
        return this.disruptor.getRingBuffer().getBufferSize();
    }

    public long getDisruptorRemainingCapacity() {
        return this.disruptor.getRingBuffer().remainingCapacity();
    }

    private static final class HeartbeatTask
    implements CommonTaskExecutor.Task<TraceProcessingDisruptor> {
        private HeartbeatTask() {
        }

        @Override
        public void run(TraceProcessingDisruptor traceProcessor) {
            traceProcessor.heartbeat();
        }
    }

    public static class TraceSerializingHandler
    implements EventHandler<DisruptorEvent<List<DDSpan>>>,
    ByteBufferConsumer {
        private final TraceProcessor processor = new TraceProcessor();
        private final Monitor monitor;
        private final DDAgentWriter writer;
        private final long flushIntervalMillis;
        private final boolean doTimeFlush;
        private final DDAgentApi api;
        private int representativeCount = 0;
        private long nextFlushMillis;
        private final TraceMapper traceMapper = new TraceMapper();
        private Packer packer;

        public TraceSerializingHandler(Monitor monitor, DDAgentWriter writer, long flushInterval, TimeUnit timeUnit, DDAgentApi api) {
            this.monitor = monitor;
            this.writer = writer;
            this.doTimeFlush = flushInterval > 0L;
            this.api = api;
            if (this.doTimeFlush) {
                this.flushIntervalMillis = timeUnit.toMillis(flushInterval);
                this.scheduleNextTimeFlush();
            } else {
                this.flushIntervalMillis = Long.MAX_VALUE;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onEvent(DisruptorEvent<List<DDSpan>> event, long sequence, boolean endOfBatch) {
            if (null == this.packer) {
                this.packer = new Packer(this, ByteBuffer.allocate(0x200000));
            }
            try {
                if (this.representativeCount > 0 && event.data == null && this.doTimeFlush && this.millisecondTime() > this.nextFlushMillis) {
                    this.packer.flush();
                    this.scheduleNextTimeFlush();
                }
                if (event.data != null) {
                    this.serialize((List)event.data, event.representativeCount);
                }
                if (null != event.flushLatch) {
                    this.packer.flush();
                    event.flushLatch.countDown();
                }
            }
            catch (Throwable e) {
                if (log.isDebugEnabled()) {
                    log.debug("Error while serializing trace", e);
                }
                this.monitor.onFailedSerialize(this.writer, (List)event.data, e);
            }
            finally {
                event.reset();
            }
        }

        private void serialize(List<DDSpan> trace, int representativeCount) {
            this.packer.format(this.processor.onTraceComplete(trace), this.traceMapper);
            this.representativeCount += representativeCount;
        }

        private void scheduleNextTimeFlush() {
            if (this.doTimeFlush) {
                this.nextFlushMillis = this.millisecondTime() + this.flushIntervalMillis;
            }
        }

        private long millisecondTime() {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        }

        @Override
        public void accept(int messageCount, ByteBuffer buffer) {
            if (messageCount > 0) {
                int sizeInBytes = buffer.limit() - buffer.position();
                this.monitor.onSerialize(sizeInBytes);
                DDAgentApi.Response response = this.api.sendSerializedTraces(messageCount, this.representativeCount, buffer);
                if (response.success()) {
                    if (log.isDebugEnabled()) {
                        log.debug("Successfully sent {} traces to the API", (Object)messageCount);
                    }
                    this.monitor.onSend(this.writer, this.representativeCount, sizeInBytes, response);
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("Failed to send {} traces (representing {}) of size {} bytes to the API", new Object[]{messageCount, this.representativeCount, sizeInBytes});
                    }
                    this.monitor.onFailedSend(this.writer, this.representativeCount, sizeInBytes, response);
                }
                this.representativeCount = 0;
            }
        }
    }
}

