/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.common.writer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.opentracing.DDSpan;
import datadog.trace.api.Config;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDApi;
import datadog.trace.common.writer.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DDAgentWriter
implements Writer {
    private static final Logger log = LoggerFactory.getLogger(DDAgentWriter.class);
    private static final int DISRUPTOR_BUFFER_SIZE = 8192;
    private static final int FLUSH_PAYLOAD_BYTES = 5000000;
    private static final int FLUSH_PAYLOAD_DELAY = 1;
    private static final EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>> TRANSLATOR = new EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>>(){

        public void translateTo(Event<List<DDSpan>> event, long sequence, List<DDSpan> trace) {
            ((Event)event).data = trace;
        }
    };
    private static final EventTranslator<Event<List<DDSpan>>> FLUSH_TRANSLATOR = new EventTranslator<Event<List<DDSpan>>>(){

        public void translateTo(Event<List<DDSpan>> event, long sequence) {
            ((Event)event).shouldFlush = true;
        }
    };
    private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-disruptor");
    private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-writer");
    private final DDApi api;
    private final int flushFrequencySeconds;
    private final Disruptor<Event<List<DDSpan>>> disruptor;
    private final ScheduledExecutorService scheduledWriterExecutor;
    private final AtomicInteger traceCount = new AtomicInteger(0);
    private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference();
    private final Phaser apiPhaser;
    private volatile boolean running = false;
    private final Runnable flushTask = new FlushTask();

    public DDAgentWriter() {
        this(new DDApi("localhost", 8126, Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET));
    }

    public DDAgentWriter(DDApi api) {
        this(api, 8192, 1);
    }

    private DDAgentWriter(DDApi api, int disruptorSize, int flushFrequencySeconds) {
        this.api = api;
        this.flushFrequencySeconds = flushFrequencySeconds;
        this.disruptor = new Disruptor(new DisruptorEventFactory(), Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), DISRUPTOR_THREAD_FACTORY, ProducerType.MULTI, (WaitStrategy)new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5L)));
        this.disruptor.handleEventsWith(new EventHandler[]{new TraceConsumer()});
        this.scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
        this.apiPhaser = new Phaser();
        this.apiPhaser.register();
    }

    @Override
    public void write(List<DDSpan> trace) {
        if (this.running) {
            boolean published = this.disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace);
            if (!published) {
                this.traceCount.incrementAndGet();
                log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace);
            }
        } else {
            log.debug("Trace written after shutdown. Ignoring trace: {}", trace);
        }
    }

    @Override
    public void incrementTraceCount() {
        this.traceCount.incrementAndGet();
    }

    public DDApi getApi() {
        return this.api;
    }

    @Override
    public void start() {
        this.disruptor.start();
        this.running = true;
        this.scheduleFlush();
    }

    @Override
    public void close() {
        this.running = false;
        this.flush();
        this.disruptor.shutdown();
        this.scheduledWriterExecutor.shutdown();
    }

    public void flush() {
        log.info("Flushing any remaining traces.");
        this.apiPhaser.register();
        this.disruptor.publishEvent(FLUSH_TRANSLATOR);
        try {
            this.apiPhaser.awaitAdvanceInterruptibly(this.apiPhaser.arriveAndDeregister());
        }
        catch (InterruptedException e) {
            log.warn("Waiting for flush interrupted.", (Throwable)e);
        }
    }

    public String toString() {
        return "DDAgentWriter { api=" + this.api + " }";
    }

    private void scheduleFlush() {
        ScheduledFuture<?> previous;
        if (this.flushFrequencySeconds > 0 && (previous = this.flushSchedule.getAndSet(this.scheduledWriterExecutor.schedule(this.flushTask, (long)this.flushFrequencySeconds, TimeUnit.SECONDS))) != null) {
            previous.cancel(true);
        }
    }

    private static class DisruptorEventFactory<T>
    implements EventFactory<Event<T>> {
        private DisruptorEventFactory() {
        }

        public Event<T> newInstance() {
            return new Event();
        }
    }

    private static class Event<T> {
        private volatile boolean shouldFlush = false;
        private volatile T data = null;

        private Event() {
        }
    }

    private class TraceConsumer
    implements EventHandler<Event<List<DDSpan>>> {
        private List<byte[]> serializedTraces = new ArrayList<byte[]>();
        private int payloadSize = 0;

        private TraceConsumer() {
        }

        public void onEvent(Event<List<DDSpan>> event, long sequence, boolean endOfBatch) {
            List trace = (List)((Event)event).data;
            ((Event)event).data = null;
            if (trace != null) {
                DDAgentWriter.this.traceCount.incrementAndGet();
                try {
                    byte[] serializedTrace = DDAgentWriter.this.api.serializeTrace(trace);
                    this.payloadSize += serializedTrace.length;
                    this.serializedTraces.add(serializedTrace);
                }
                catch (JsonProcessingException e) {
                    log.warn("Error serializing trace", (Throwable)e);
                }
                catch (Throwable e) {
                    log.debug("Error while serializing trace", e);
                }
            }
            if (((Event)event).shouldFlush || this.payloadSize >= 5000000) {
                this.reportTraces();
                ((Event)event).shouldFlush = false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void reportTraces() {
            try {
                if (this.serializedTraces.isEmpty()) {
                    DDAgentWriter.this.apiPhaser.arrive();
                    return;
                }
                final List<byte[]> toSend = this.serializedTraces;
                this.serializedTraces = new ArrayList<byte[]>(toSend.size());
                final int representativeCount = DDAgentWriter.this.traceCount.getAndSet(0);
                final int sizeInBytes = this.payloadSize;
                DDAgentWriter.this.scheduledWriterExecutor.execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            boolean sent = DDAgentWriter.this.api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
                            if (sent) {
                                log.debug("Successfully sent {} traces to the API", (Object)toSend.size());
                            } else {
                                log.debug("Failed to send {} traces (representing {}) of size {} bytes to the API", new Object[]{toSend.size(), representativeCount, sizeInBytes});
                            }
                        }
                        catch (Throwable e) {
                            log.debug("Failed to send traces to the API: {}", (Object)e.getMessage());
                        }
                        finally {
                            DDAgentWriter.this.apiPhaser.arrive();
                        }
                    }
                });
            }
            finally {
                this.payloadSize = 0;
                DDAgentWriter.this.scheduleFlush();
            }
        }
    }

    private class FlushTask
    implements Runnable {
        private FlushTask() {
        }

        @Override
        public void run() {
            DDAgentWriter.this.disruptor.publishEvent(FLUSH_TRANSLATOR);
        }
    }
}

