/*
 * Decompiled with CFR 0.152.
 */
package co.elastic.apm.agent.report;

import co.elastic.apm.agent.impl.MetaData;
import co.elastic.apm.agent.report.ApmServerClient;
import co.elastic.apm.agent.report.ApmServerReporter;
import co.elastic.apm.agent.report.ReporterConfiguration;
import co.elastic.apm.agent.report.ReportingEvent;
import co.elastic.apm.agent.report.ReportingEventHandler;
import co.elastic.apm.agent.report.processor.ProcessorEventHandler;
import co.elastic.apm.agent.report.serialize.PayloadSerializer;
import co.elastic.apm.agent.shaded.slf4j.Logger;
import co.elastic.apm.agent.shaded.slf4j.LoggerFactory;
import co.elastic.apm.agent.shaded.stagemonitor.util.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import javax.annotation.Nullable;

public class IntakeV2ReportingEventHandler
implements ReportingEventHandler {
    public static final String INTAKE_V2_URL = "/intake/v2/events";
    private static final Logger logger = LoggerFactory.getLogger(IntakeV2ReportingEventHandler.class);
    private static final int GZIP_COMPRESSION_LEVEL = 1;
    private static final Object WAIT_LOCK = new Object();
    private final ReporterConfiguration reporterConfiguration;
    private final ProcessorEventHandler processorEventHandler;
    private final MetaData metaData;
    private final PayloadSerializer payloadSerializer;
    private final Timer timeoutTimer;
    private final ApmServerClient apmServerClient;
    private Deflater deflater;
    private long currentlyTransmitting = 0L;
    private long reported = 0L;
    private long dropped = 0L;
    @Nullable
    private HttpURLConnection connection;
    @Nullable
    private OutputStream os;
    @Nullable
    private ApmServerReporter reporter;
    @Nullable
    private TimerTask timeoutTask;
    private int errorCount;
    private volatile boolean shutDown;

    public IntakeV2ReportingEventHandler(ReporterConfiguration reporterConfiguration, ProcessorEventHandler processorEventHandler, PayloadSerializer payloadSerializer, MetaData metaData, ApmServerClient apmServerClient) {
        this.reporterConfiguration = reporterConfiguration;
        this.processorEventHandler = processorEventHandler;
        this.payloadSerializer = payloadSerializer;
        this.metaData = metaData;
        this.apmServerClient = apmServerClient;
        this.deflater = new Deflater(1);
        this.timeoutTimer = new Timer("apm-request-timeout-timer", true);
    }

    static long getRandomJitter(long backoffTimeMillis) {
        long tenPercentOfBackoffTimeMillis = (long)((double)backoffTimeMillis * 0.1);
        return (long)((double)(tenPercentOfBackoffTimeMillis * 2L) * Math.random()) - tenPercentOfBackoffTimeMillis;
    }

    static long getBackoffTimeSeconds(long errorCount) {
        return (long)Math.pow(Math.min(errorCount, 6L), 2.0);
    }

    @Override
    public void init(ApmServerReporter reporter) {
        this.reporter = reporter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onEvent(ReportingEvent event, long sequence, boolean endOfBatch) {
        if (logger.isDebugEnabled()) {
            logger.debug("Receiving {} event (sequence {})", (Object)event.getType(), (Object)sequence);
        }
        try {
            if (!this.shutDown) {
                this.handleEvent(event, sequence, endOfBatch);
            }
        }
        finally {
            event.resetState();
        }
    }

    private void handleEvent(ReportingEvent event, long sequence, boolean endOfBatch) {
        if (event.getType() == null) {
            return;
        }
        if (event.getType() == ReportingEvent.ReportingEventType.FLUSH) {
            this.flush();
            return;
        }
        if (event.getType() == ReportingEvent.ReportingEventType.SHUTDOWN) {
            this.shutDown = true;
            this.flush();
            return;
        }
        this.processorEventHandler.onEvent(event, sequence, endOfBatch);
        try {
            if (this.connection == null) {
                this.connection = this.startRequest();
                this.payloadSerializer.serializeMetaDataNdJson(this.metaData);
            }
            this.writeEvent(event);
        }
        catch (Exception e) {
            logger.error("Failed to handle event of type {} with this error: {}", (Object)event.getType(), (Object)e.getMessage());
            logger.debug("Event handling failure", e);
            this.flush();
            this.onConnectionError(null, this.currentlyTransmitting + 1L, 0L);
        }
        if (this.shouldFlush()) {
            this.flush();
        }
    }

    int getBufferSize() {
        return this.payloadSerializer.getBufferSize();
    }

    private void writeEvent(ReportingEvent event) {
        if (event.getTransaction() != null) {
            ++this.currentlyTransmitting;
            this.payloadSerializer.serializeTransactionNdJson(event.getTransaction());
            event.getTransaction().decrementReferences();
        } else if (event.getSpan() != null) {
            ++this.currentlyTransmitting;
            this.payloadSerializer.serializeSpanNdJson(event.getSpan());
            event.getSpan().decrementReferences();
        } else if (event.getError() != null) {
            ++this.currentlyTransmitting;
            this.payloadSerializer.serializeErrorNdJson(event.getError());
            event.getError().recycle();
        } else if (event.getMetricRegistry() != null) {
            this.payloadSerializer.serializeMetrics(event.getMetricRegistry());
        }
    }

    private boolean shouldFlush() {
        boolean flush;
        long written = this.deflater.getBytesWritten() + 16384L;
        boolean bl = flush = written >= this.reporterConfiguration.getApiRequestSize();
        if (flush && logger.isDebugEnabled()) {
            logger.debug("Flushing, because request size limit exceeded {}/{}", (Object)written, (Object)this.reporterConfiguration.getApiRequestSize());
        }
        return flush;
    }

    private HttpURLConnection startRequest() throws IOException {
        HttpURLConnection connection = this.apmServerClient.startRequest(INTAKE_V2_URL);
        if (logger.isDebugEnabled()) {
            logger.debug("Starting new request to {}", (Object)connection.getURL());
        }
        connection.setRequestMethod("POST");
        connection.setDoOutput(true);
        connection.setChunkedStreamingMode(16384);
        connection.setRequestProperty("Content-Encoding", "deflate");
        connection.setRequestProperty("Content-Type", "application/x-ndjson");
        connection.setUseCaches(false);
        connection.connect();
        this.os = new DeflaterOutputStream(connection.getOutputStream(), this.deflater);
        this.payloadSerializer.setOutputStream(this.os);
        if (this.reporter != null) {
            this.timeoutTask = new FlushOnTimeoutTimerTask(this.reporter);
            if (logger.isDebugEnabled()) {
                logger.debug("Scheduling request timeout in {}", (Object)this.reporterConfiguration.getApiRequestTime());
            }
            this.timeoutTimer.schedule(this.timeoutTask, this.reporterConfiguration.getApiRequestTime().getMillis());
        }
        return connection;
    }

    void flush() {
        this.cancelTimeout();
        if (this.connection != null) {
            try {
                this.payloadSerializer.flush();
                if (this.os != null) {
                    this.os.close();
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Flushing {} uncompressed {} compressed bytes", (Object)this.deflater.getBytesRead(), (Object)this.deflater.getBytesWritten());
                }
                InputStream inputStream = this.connection.getInputStream();
                int responseCode = this.connection.getResponseCode();
                if (responseCode >= 400) {
                    this.onFlushError(responseCode, inputStream, null);
                } else {
                    this.onFlushSuccess(inputStream);
                }
            }
            catch (IOException e) {
                try {
                    this.onFlushError(this.connection.getResponseCode(), this.connection.getErrorStream(), e);
                }
                catch (IOException e1) {
                    this.onFlushError(-1, this.connection.getErrorStream(), e);
                }
            }
            finally {
                this.connection.disconnect();
                this.connection = null;
                this.deflater.reset();
                this.currentlyTransmitting = 0L;
            }
        }
    }

    private void cancelTimeout() {
        if (this.timeoutTask != null) {
            this.timeoutTask.cancel();
            this.timeoutTask = null;
        }
    }

    private void onFlushSuccess(InputStream inputStream) {
        this.errorCount = 0;
        this.reported += this.currentlyTransmitting;
        IOUtils.consumeAndClose(inputStream);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onFlushError(Integer responseCode, InputStream inputStream, @Nullable IOException e) {
        this.onConnectionError(responseCode, this.currentlyTransmitting, 0L);
        if (e != null) {
            logger.error("Error sending data to APM server: {}, response code is {}", (Object)e.getMessage(), (Object)responseCode);
            logger.debug("Sending payload to APM server failed", e);
        }
        if (logger.isWarnEnabled()) {
            try {
                logger.warn(IOUtils.toString(inputStream));
            }
            catch (IOException e1) {
                logger.warn(e1.getMessage(), e);
            }
            finally {
                IOUtils.closeQuietly(inputStream);
            }
        } else {
            IOUtils.consumeAndClose(inputStream);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onConnectionError(@Nullable Integer responseCode, long droppedEvents, long reportedEvents) {
        this.dropped += droppedEvents;
        this.reported += reportedEvents;
        if (responseCode == null || responseCode > 429) {
            this.apmServerClient.onConnectionError();
        } else if (responseCode == 404) {
            logger.warn("It seems like you are using a version of the APM Server which is not compatible with this agent. Please use APM Server 6.5.0 or newer.");
        }
        long backoffTimeSeconds = IntakeV2ReportingEventHandler.getBackoffTimeSeconds(this.errorCount++);
        logger.info("Backing off for {} seconds (+/-10%)", (Object)backoffTimeSeconds);
        long backoffTimeMillis = TimeUnit.SECONDS.toMillis(backoffTimeSeconds);
        if (backoffTimeMillis > 0L) {
            try {
                Object object = WAIT_LOCK;
                synchronized (object) {
                    WAIT_LOCK.wait(backoffTimeMillis + IntakeV2ReportingEventHandler.getRandomJitter(backoffTimeMillis));
                }
            }
            catch (InterruptedException e) {
                logger.info("APM Agent ReportingEventHandler had been interrupted", e);
            }
        }
    }

    @Override
    public long getReported() {
        return this.reported;
    }

    @Override
    public long getDropped() {
        return this.dropped;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        this.shutDown = true;
        this.timeoutTimer.cancel();
        Object object = WAIT_LOCK;
        synchronized (object) {
            WAIT_LOCK.notifyAll();
        }
    }

    private static class FlushOnTimeoutTimerTask
    extends TimerTask {
        private final ApmServerReporter reporter;
        @Nullable
        private volatile Future<Void> flush;

        private FlushOnTimeoutTimerTask(ApmServerReporter reporter) {
            this.reporter = reporter;
        }

        @Override
        public void run() {
            logger.debug("Request flush because the request timeout occurred");
            try {
                this.flush = this.reporter.flush();
            }
            catch (Exception e) {
                logger.info("Failed to register a Flush event to the disruptor: {}", (Object)e.getMessage());
            }
        }

        @Override
        public boolean cancel() {
            boolean cancel = super.cancel();
            Future<Void> flush = this.flush;
            if (flush != null) {
                flush.cancel(false);
            }
            return cancel;
        }
    }
}

