/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.common.writer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.api.Config;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDApi;
import datadog.trace.common.writer.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DDAgentWriter
implements Writer {
    private static final Logger log = LoggerFactory.getLogger(DDAgentWriter.class);
    private static final int DISRUPTOR_BUFFER_SIZE = 1024;
    private static final int SENDER_QUEUE_SIZE = 16;
    private static final int FLUSH_PAYLOAD_BYTES = 5000000;
    private static final int FLUSH_PAYLOAD_DELAY = 1;
    private static final EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>> TRANSLATOR = new EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>>(){

        public void translateTo(Event<List<DDSpan>> event, long sequence, List<DDSpan> trace) {
            ((Event)event).data = trace;
        }
    };
    private static final EventTranslator<Event<List<DDSpan>>> FLUSH_TRANSLATOR = new EventTranslator<Event<List<DDSpan>>>(){

        public void translateTo(Event<List<DDSpan>> event, long sequence) {
            ((Event)event).shouldFlush = true;
        }
    };
    private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-disruptor");
    private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-writer");
    private final Runnable flushTask = new FlushTask();
    private final DDApi api;
    private final int flushFrequencySeconds;
    private final Disruptor<Event<List<DDSpan>>> disruptor;
    private final Semaphore senderSemaphore;
    private final ScheduledExecutorService scheduledWriterExecutor;
    private final AtomicInteger traceCount = new AtomicInteger(0);
    private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference();
    private final Phaser apiPhaser;
    private volatile boolean running = false;
    private final Monitor monitor;

    public DDAgentWriter() {
        this(new DDApi("localhost", 8126, Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET), new NoopMonitor());
    }

    public DDAgentWriter(DDApi api, Monitor monitor) {
        this(api, monitor, 1024, 16, 1);
    }

    private DDAgentWriter(DDApi api) {
        this(api, new NoopMonitor());
    }

    private DDAgentWriter(DDApi api, int disruptorSize, int senderQueueSize, int flushFrequencySeconds) {
        this(api, new NoopMonitor(), disruptorSize, senderQueueSize, flushFrequencySeconds);
    }

    private DDAgentWriter(DDApi api, Monitor monitor, int disruptorSize, int flushFrequencySeconds) {
        this(api, monitor, disruptorSize, 16, flushFrequencySeconds);
    }

    private DDAgentWriter(DDApi api, int disruptorSize, int flushFrequencySeconds) {
        this(api, new NoopMonitor(), disruptorSize, 16, flushFrequencySeconds);
    }

    private DDAgentWriter(DDApi api, Monitor monitor, int disruptorSize, int senderQueueSize, int flushFrequencySeconds) {
        this.api = api;
        this.monitor = monitor;
        this.disruptor = new Disruptor(new DisruptorEventFactory(), Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), DISRUPTOR_THREAD_FACTORY, ProducerType.MULTI, (WaitStrategy)new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5L)));
        this.disruptor.handleEventsWith(new EventHandler[]{new TraceConsumer()});
        this.flushFrequencySeconds = flushFrequencySeconds;
        this.senderSemaphore = new Semaphore(senderQueueSize);
        this.scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
        this.apiPhaser = new Phaser();
        this.apiPhaser.register();
    }

    public final long getDisruptorCapacity() {
        return this.disruptor.getRingBuffer().getBufferSize();
    }

    public final long getDisruptorUtilizedCapacity() {
        return this.getDisruptorCapacity() - this.getDisruptorRemainingCapacity();
    }

    public final long getDisruptorRemainingCapacity() {
        return this.disruptor.getRingBuffer().remainingCapacity();
    }

    @Override
    public void write(List<DDSpan> trace) {
        if (this.running) {
            boolean published = this.disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace);
            if (published) {
                this.monitor.onPublish(this, trace);
            } else {
                this.traceCount.incrementAndGet();
                log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace);
                this.monitor.onFailedPublish(this, trace);
            }
        } else {
            log.debug("Trace written after shutdown. Ignoring trace: {}", trace);
            this.monitor.onFailedPublish(this, trace);
        }
    }

    @Override
    public void incrementTraceCount() {
        this.traceCount.incrementAndGet();
    }

    public DDApi getApi() {
        return this.api;
    }

    @Override
    public void start() {
        this.disruptor.start();
        this.running = true;
        this.scheduleFlush();
        this.monitor.onStart(this);
    }

    @Override
    public void close() {
        this.running = false;
        boolean flushSuccess = true;
        this.scheduledWriterExecutor.shutdown();
        try {
            this.scheduledWriterExecutor.awaitTermination(this.flushFrequencySeconds, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            log.warn("Waiting for flush executor shutdown interrupted.", (Throwable)e);
            flushSuccess = false;
        }
        this.disruptor.shutdown();
        this.monitor.onShutdown(this, flushSuccess |= this.flush());
    }

    public boolean flush() {
        if (this.running) {
            log.info("Flushing any remaining traces.");
            this.apiPhaser.register();
            this.disruptor.publishEvent(FLUSH_TRANSLATOR);
            try {
                this.apiPhaser.awaitAdvanceInterruptibly(this.apiPhaser.arriveAndDeregister());
                return true;
            }
            catch (InterruptedException e) {
                log.warn("Waiting for flush interrupted.", (Throwable)e);
                return false;
            }
        }
        return false;
    }

    public String toString() {
        String str = "DDAgentWriter { api=" + this.api;
        if (!(this.monitor instanceof NoopMonitor)) {
            str = str + ", monitor=" + this.monitor;
        }
        str = str + " }";
        return str;
    }

    private void scheduleFlush() {
        if (this.flushFrequencySeconds > 0 && !this.scheduledWriterExecutor.isShutdown()) {
            boolean previousIncomplete;
            ScheduledFuture<?> previous = this.flushSchedule.getAndSet(this.scheduledWriterExecutor.schedule(this.flushTask, (long)this.flushFrequencySeconds, TimeUnit.SECONDS));
            boolean bl = previousIncomplete = previous != null;
            if (previousIncomplete) {
                previous.cancel(true);
            }
            this.monitor.onScheduleFlush(this, previousIncomplete);
        }
    }

    public static final class StatsDMonitor
    implements Monitor {
        public static final String PREFIX = "datadog.tracer";
        public static final String LANG_TAG = "lang";
        public static final String LANG_VERSION_TAG = "lang_version";
        public static final String LANG_INTERPRETER_TAG = "lang_interpreter";
        public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor";
        public static final String TRACER_VERSION_TAG = "tracer_version";
        private final String hostInfo;
        private final StatsDClient statsd;

        public StatsDMonitor(String host, int port) {
            this.hostInfo = host + ":" + port;
            this.statsd = new NonBlockingStatsDClient(PREFIX, host, port, StatsDMonitor.getDefaultTags());
        }

        private StatsDMonitor(StatsDClient statsd) {
            this.hostInfo = null;
            this.statsd = statsd;
        }

        protected static final String[] getDefaultTags() {
            return new String[]{StatsDMonitor.tag(LANG_TAG, "java"), StatsDMonitor.tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION), StatsDMonitor.tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME), StatsDMonitor.tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR), StatsDMonitor.tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION)};
        }

        private static String tag(String tagPrefix, String tagValue) {
            return tagPrefix + ":" + tagValue;
        }

        @Override
        public void onStart(DDAgentWriter agentWriter) {
            this.statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity(), new String[0]);
        }

        @Override
        public void onShutdown(DDAgentWriter agentWriter, boolean flushSuccess) {
        }

        @Override
        public void onPublish(DDAgentWriter agentWriter, List<DDSpan> trace) {
            this.statsd.incrementCounter("queue.accepted", new String[0]);
            this.statsd.count("queue.accepted_lengths", (long)trace.size(), new String[0]);
        }

        @Override
        public void onFailedPublish(DDAgentWriter agentWriter, List<DDSpan> trace) {
            this.statsd.incrementCounter("queue.dropped", new String[0]);
        }

        @Override
        public void onScheduleFlush(DDAgentWriter agentWriter, boolean previousIncomplete) {
        }

        @Override
        public void onFlush(DDAgentWriter agentWriter, boolean early) {
        }

        @Override
        public void onSerialize(DDAgentWriter agentWriter, List<DDSpan> trace, byte[] serializedTrace) {
            this.statsd.count("queue.accepted_size", (long)serializedTrace.length, new String[0]);
        }

        @Override
        public void onFailedSerialize(DDAgentWriter agentWriter, List<DDSpan> trace, Throwable optionalCause) {
        }

        @Override
        public void onSend(DDAgentWriter agentWriter, int representativeCount, int sizeInBytes, DDApi.Response response) {
            this.onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
        }

        @Override
        public void onFailedSend(DDAgentWriter agentWriter, int representativeCount, int sizeInBytes, DDApi.Response response) {
            this.onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
        }

        private void onSendAttempt(DDAgentWriter agentWriter, int representativeCount, int sizeInBytes, DDApi.Response response) {
            this.statsd.incrementCounter("api.requests", new String[0]);
            this.statsd.recordGaugeValue("queue.length", (long)representativeCount, new String[0]);
            this.statsd.recordGaugeValue("queue.size", (long)sizeInBytes, new String[0]);
            if (response.exception() != null) {
                this.statsd.incrementCounter("api.errors", new String[0]);
            }
            if (response.status() != null) {
                this.statsd.incrementCounter("api.responses", new String[]{"status: " + response.status()});
            }
        }

        public String toString() {
            if (this.hostInfo == null) {
                return "StatsD";
            }
            return "StatsD { host=" + this.hostInfo + " }";
        }
    }

    public static final class NoopMonitor
    implements Monitor {
        @Override
        public void onStart(DDAgentWriter agentWriter) {
        }

        @Override
        public void onShutdown(DDAgentWriter agentWriter, boolean flushSuccess) {
        }

        @Override
        public void onPublish(DDAgentWriter agentWriter, List<DDSpan> trace) {
        }

        @Override
        public void onFailedPublish(DDAgentWriter agentWriter, List<DDSpan> trace) {
        }

        @Override
        public void onFlush(DDAgentWriter agentWriter, boolean early) {
        }

        @Override
        public void onScheduleFlush(DDAgentWriter agentWriter, boolean previousIncomplete) {
        }

        @Override
        public void onSerialize(DDAgentWriter agentWriter, List<DDSpan> trace, byte[] serializedTrace) {
        }

        @Override
        public void onFailedSerialize(DDAgentWriter agentWriter, List<DDSpan> trace, Throwable optionalCause) {
        }

        @Override
        public void onSend(DDAgentWriter agentWriter, int representativeCount, int sizeInBytes, DDApi.Response response) {
        }

        @Override
        public void onFailedSend(DDAgentWriter agentWriter, int representativeCount, int sizeInBytes, DDApi.Response response) {
        }

        public String toString() {
            return "NoOp";
        }
    }

    public static interface Monitor {
        public void onStart(DDAgentWriter var1);

        public void onShutdown(DDAgentWriter var1, boolean var2);

        public void onPublish(DDAgentWriter var1, List<DDSpan> var2);

        public void onFailedPublish(DDAgentWriter var1, List<DDSpan> var2);

        public void onFlush(DDAgentWriter var1, boolean var2);

        public void onScheduleFlush(DDAgentWriter var1, boolean var2);

        public void onSerialize(DDAgentWriter var1, List<DDSpan> var2, byte[] var3);

        public void onFailedSerialize(DDAgentWriter var1, List<DDSpan> var2, Throwable var3);

        public void onSend(DDAgentWriter var1, int var2, int var3, DDApi.Response var4);

        public void onFailedSend(DDAgentWriter var1, int var2, int var3, DDApi.Response var4);
    }

    private static class DisruptorEventFactory<T>
    implements EventFactory<Event<T>> {
        private DisruptorEventFactory() {
        }

        public Event<T> newInstance() {
            return new Event();
        }
    }

    private static class Event<T> {
        private volatile boolean shouldFlush = false;
        private volatile T data = null;

        private Event() {
        }
    }

    private class TraceConsumer
    implements EventHandler<Event<List<DDSpan>>> {
        private List<byte[]> serializedTraces = new ArrayList<byte[]>();
        private int payloadSize = 0;

        private TraceConsumer() {
        }

        public void onEvent(Event<List<DDSpan>> event, long sequence, boolean endOfBatch) {
            List trace = (List)((Event)event).data;
            ((Event)event).data = null;
            if (trace != null) {
                DDAgentWriter.this.traceCount.incrementAndGet();
                try {
                    byte[] serializedTrace = DDAgentWriter.this.api.serializeTrace(trace);
                    this.payloadSize += serializedTrace.length;
                    this.serializedTraces.add(serializedTrace);
                    DDAgentWriter.this.monitor.onSerialize(DDAgentWriter.this, trace, serializedTrace);
                }
                catch (JsonProcessingException e) {
                    log.warn("Error serializing trace", (Throwable)e);
                    DDAgentWriter.this.monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
                }
                catch (Throwable e) {
                    log.debug("Error while serializing trace", e);
                    DDAgentWriter.this.monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
                }
            }
            if (((Event)event).shouldFlush || this.payloadSize >= 5000000) {
                boolean early = this.payloadSize >= 5000000;
                this.reportTraces(early);
                ((Event)event).shouldFlush = false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void reportTraces(boolean early) {
            try {
                if (this.serializedTraces.isEmpty()) {
                    DDAgentWriter.this.monitor.onFlush(DDAgentWriter.this, early);
                    DDAgentWriter.this.apiPhaser.arrive();
                    return;
                }
                if (DDAgentWriter.this.scheduledWriterExecutor.isShutdown()) {
                    DDAgentWriter.this.monitor.onFailedSend(DDAgentWriter.this, DDAgentWriter.this.traceCount.get(), this.payloadSize, DDApi.Response.failed(-1));
                    DDAgentWriter.this.apiPhaser.arrive();
                    return;
                }
                final List<byte[]> toSend = this.serializedTraces;
                this.serializedTraces = new ArrayList<byte[]>(toSend.size());
                final int representativeCount = DDAgentWriter.this.traceCount.getAndSet(0);
                final int sizeInBytes = this.payloadSize;
                DDAgentWriter.this.monitor.onFlush(DDAgentWriter.this, early);
                try {
                    DDAgentWriter.this.senderSemaphore.acquire();
                }
                catch (InterruptedException e) {
                    DDAgentWriter.this.monitor.onFailedSend(DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(e));
                    this.payloadSize = 0;
                    DDAgentWriter.this.scheduleFlush();
                    return;
                }
                try {
                    DDAgentWriter.this.scheduledWriterExecutor.execute(new Runnable(){

                        @Override
                        public void run() {
                            DDAgentWriter.this.senderSemaphore.release();
                            try {
                                DDApi.Response response = DDAgentWriter.this.api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
                                if (response.success()) {
                                    log.debug("Successfully sent {} traces to the API", (Object)toSend.size());
                                    DDAgentWriter.this.monitor.onSend(DDAgentWriter.this, representativeCount, sizeInBytes, response);
                                } else {
                                    log.debug("Failed to send {} traces (representing {}) of size {} bytes to the API", new Object[]{toSend.size(), representativeCount, sizeInBytes});
                                    DDAgentWriter.this.monitor.onFailedSend(DDAgentWriter.this, representativeCount, sizeInBytes, response);
                                }
                            }
                            catch (Throwable e) {
                                log.debug("Failed to send traces to the API: {}", (Object)e.getMessage());
                                DDAgentWriter.this.monitor.onFailedSend(DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(e));
                            }
                            finally {
                                DDAgentWriter.this.apiPhaser.arrive();
                            }
                        }
                    });
                }
                catch (RejectedExecutionException ex) {
                    DDAgentWriter.this.monitor.onFailedSend(DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(ex));
                    DDAgentWriter.this.apiPhaser.arrive();
                }
            }
            finally {
                this.payloadSize = 0;
                DDAgentWriter.this.scheduleFlush();
            }
        }
    }

    private class FlushTask
    implements Runnable {
        private FlushTask() {
        }

        @Override
        public void run() {
            DDAgentWriter.this.disruptor.publishEvent(FLUSH_TRANSLATOR);
        }
    }
}

