/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.common.writer.ddagent;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventHandler;
import datadog.opentracing.DDSpan;
import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.common.writer.ddagent.DisruptorEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TraceConsumer
implements EventHandler<DisruptorEvent<List<DDSpan>>> {
    private static final Logger log = LoggerFactory.getLogger(TraceConsumer.class);
    private static final int FLUSH_PAYLOAD_BYTES = 5000000;
    private final AtomicInteger traceCount;
    private final Semaphore senderSemaphore;
    private final DDAgentWriter writer;
    private List<byte[]> serializedTraces = new ArrayList<byte[]>();
    private int payloadSize = 0;

    public TraceConsumer(AtomicInteger traceCount, int senderQueueSize, DDAgentWriter writer) {
        this.traceCount = traceCount;
        this.senderSemaphore = new Semaphore(senderQueueSize);
        this.writer = writer;
    }

    public void onEvent(DisruptorEvent<List<DDSpan>> event, long sequence, boolean endOfBatch) {
        List trace = (List)event.data;
        event.data = null;
        if (trace != null) {
            this.traceCount.incrementAndGet();
            try {
                byte[] serializedTrace = this.writer.getApi().serializeTrace(trace);
                this.payloadSize += serializedTrace.length;
                this.serializedTraces.add(serializedTrace);
                this.writer.monitor.onSerialize(this.writer, trace, serializedTrace);
            }
            catch (JsonProcessingException e) {
                log.warn("Error serializing trace", (Throwable)e);
                this.writer.monitor.onFailedSerialize(this.writer, trace, e);
            }
            catch (Throwable e) {
                log.debug("Error while serializing trace", e);
                this.writer.monitor.onFailedSerialize(this.writer, trace, e);
            }
        }
        if (event.shouldFlush || this.payloadSize >= 5000000) {
            boolean early = this.payloadSize >= 5000000;
            this.reportTraces(early);
            event.shouldFlush = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reportTraces(boolean early) {
        try {
            if (this.serializedTraces.isEmpty()) {
                this.writer.monitor.onFlush(this.writer, early);
                this.writer.apiPhaser.arrive();
                return;
            }
            if (this.writer.scheduledWriterExecutor.isShutdown()) {
                this.writer.monitor.onFailedSend(this.writer, this.traceCount.get(), this.payloadSize, DDAgentApi.Response.failed(-1));
                this.writer.apiPhaser.arrive();
                return;
            }
            final List<byte[]> toSend = this.serializedTraces;
            this.serializedTraces = new ArrayList<byte[]>(toSend.size());
            final int representativeCount = this.traceCount.getAndSet(0);
            final int sizeInBytes = this.payloadSize;
            this.writer.monitor.onFlush(this.writer, early);
            try {
                this.senderSemaphore.acquire();
            }
            catch (InterruptedException e) {
                this.writer.monitor.onFailedSend(this.writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
                this.payloadSize = 0;
                this.writer.disruptor.scheduleFlush();
                return;
            }
            try {
                this.writer.scheduledWriterExecutor.execute(new Runnable(){

                    @Override
                    public void run() {
                        TraceConsumer.this.senderSemaphore.release();
                        try {
                            DDAgentApi.Response response = TraceConsumer.this.writer.getApi().sendSerializedTraces(representativeCount, sizeInBytes, toSend);
                            if (response.success()) {
                                log.debug("Successfully sent {} traces to the API", (Object)toSend.size());
                                ((TraceConsumer)TraceConsumer.this).writer.monitor.onSend(TraceConsumer.this.writer, representativeCount, sizeInBytes, response);
                            } else {
                                log.debug("Failed to send {} traces (representing {}) of size {} bytes to the API", new Object[]{toSend.size(), representativeCount, sizeInBytes});
                                ((TraceConsumer)TraceConsumer.this).writer.monitor.onFailedSend(TraceConsumer.this.writer, representativeCount, sizeInBytes, response);
                            }
                        }
                        catch (Throwable e) {
                            log.debug("Failed to send traces to the API: {}", (Object)e.getMessage());
                            ((TraceConsumer)TraceConsumer.this).writer.monitor.onFailedSend(TraceConsumer.this.writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
                        }
                        finally {
                            ((TraceConsumer)TraceConsumer.this).writer.apiPhaser.arrive();
                        }
                    }
                });
            }
            catch (RejectedExecutionException ex) {
                this.writer.monitor.onFailedSend(this.writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex));
                this.writer.apiPhaser.arrive();
            }
        }
        finally {
            this.payloadSize = 0;
            this.writer.disruptor.scheduleFlush();
        }
    }
}

