/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.common.writer.ddagent;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorTwoArg;
import datadog.common.exec.CommonTaskExecutor;
import datadog.common.exec.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.ddagent.AbstractDisruptor;
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.common.writer.ddagent.DisruptorEvent;
import datadog.trace.common.writer.ddagent.Monitor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchWritingDisruptor
extends AbstractDisruptor<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(BatchWritingDisruptor.class);
    private static final int FLUSH_PAYLOAD_BYTES = 5000000;
    private final DisruptorEvent.HeartbeatTranslator<byte[]> heartbeatTranslator = new DisruptorEvent.HeartbeatTranslator();

    public BatchWritingDisruptor(int disruptorSize, int flushFrequencySeconds, DDAgentApi api, Monitor monitor, DDAgentWriter writer) {
        super(disruptorSize, new BatchWritingHandler(flushFrequencySeconds, api, monitor, writer));
        if (0 < flushFrequencySeconds) {
            CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(new HeartbeatTask(), this, 100L, 100L, TimeUnit.MILLISECONDS, "disruptor heartbeat");
        }
    }

    @Override
    protected DaemonThreadFactory getThreadFactory() {
        return DaemonThreadFactory.TRACE_WRITER;
    }

    @Override
    public boolean publish(byte[] data, int representativeCount) {
        this.disruptor.getRingBuffer().publishEvent((EventTranslatorTwoArg)this.dataTranslator, (Object)data, (Object)representativeCount);
        return true;
    }

    private void heartbeat() {
        if (this.running && this.getCurrentCount() == 0L) {
            this.disruptor.getRingBuffer().tryPublishEvent(this.heartbeatTranslator);
        }
    }

    private static final class HeartbeatTask
    implements CommonTaskExecutor.Task<BatchWritingDisruptor> {
        private HeartbeatTask() {
        }

        @Override
        public void run(BatchWritingDisruptor target) {
            target.heartbeat();
        }
    }

    private static class BatchWritingHandler
    implements EventHandler<DisruptorEvent<byte[]>> {
        private final long flushFrequencyNanos;
        private final DDAgentApi api;
        private final Monitor monitor;
        private final DDAgentWriter writer;
        private final List<byte[]> serializedTraces = new ArrayList<byte[]>();
        private int representativeCount = 0;
        private int sizeInBytes = 0;
        private long nextScheduledFlush;

        private BatchWritingHandler(int flushFrequencySeconds, DDAgentApi api, Monitor monitor, DDAgentWriter writer) {
            this.flushFrequencyNanos = TimeUnit.SECONDS.toNanos(flushFrequencySeconds);
            this.scheduleNextFlush();
            this.api = api;
            this.monitor = monitor;
            this.writer = writer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onEvent(DisruptorEvent<byte[]> event, long sequence, boolean endOfBatch) {
            try {
                if (event.data != null) {
                    this.sizeInBytes += ((byte[])event.data).length;
                    this.serializedTraces.add((byte[])event.data);
                }
                this.representativeCount += event.representativeCount;
                if (event.flushLatch != null || 5000000 <= this.sizeInBytes || this.nextScheduledFlush <= System.nanoTime()) {
                    this.flush(event.flushLatch, 5000000 <= this.sizeInBytes);
                }
            }
            finally {
                event.reset();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void flush(CountDownLatch flushLatch, boolean early) {
            try {
                if (this.serializedTraces.isEmpty()) {
                    return;
                }
                DDAgentApi.Response response = this.api.sendSerializedTraces(this.representativeCount, this.sizeInBytes, this.serializedTraces);
                this.monitor.onFlush(this.writer, early);
                if (response.success()) {
                    log.debug("Successfully sent {} traces to the API", (Object)this.serializedTraces.size());
                    this.monitor.onSend(this.writer, this.representativeCount, this.sizeInBytes, response);
                } else {
                    log.debug("Failed to send {} traces (representing {}) of size {} bytes to the API", new Object[]{this.serializedTraces.size(), this.representativeCount, this.sizeInBytes});
                    this.monitor.onFailedSend(this.writer, this.representativeCount, this.sizeInBytes, response);
                }
            }
            catch (Throwable e) {
                log.debug("Failed to send traces to the API: {}", (Object)e.getMessage());
                this.monitor.onFailedSend(this.writer, this.representativeCount, this.sizeInBytes, DDAgentApi.Response.failed(e));
            }
            finally {
                this.serializedTraces.clear();
                this.sizeInBytes = 0;
                this.representativeCount = 0;
                this.scheduleNextFlush();
                if (flushLatch != null) {
                    flushLatch.countDown();
                }
            }
        }

        private void scheduleNextFlush() {
            this.nextScheduledFlush = 0L < this.flushFrequencyNanos ? System.nanoTime() + this.flushFrequencyNanos : Long.MAX_VALUE;
        }
    }
}

