/*
 * Decompiled with CFR 0.152.
 */
package com.metamx.emitter.core;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.StringUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.core.Batch;
import com.metamx.emitter.core.BatchingStrategy;
import com.metamx.emitter.core.ContentEncoding;
import com.metamx.emitter.core.EmittedBatchCounter;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.core.HttpEmitterConfig;
import com.metamx.emitter.core.ZeroCopyByteArrayOutputStream;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.zip.GZIPOutputStream;
import org.jboss.netty.handler.codec.http.HttpMethod;

public class HttpPostEmitter
implements Flushable,
Closeable,
Emitter {
    private static final int MAX_EVENT_SIZE = 1047552;
    private static final int MAX_SEND_RETRIES = 3;
    private static final byte[] LARGE_EVENTS_STOP = new byte[0];
    private static final Logger log = new Logger(HttpPostEmitter.class);
    private static final AtomicInteger instanceCounter = new AtomicInteger();
    final BatchingStrategy batchingStrategy;
    final HttpEmitterConfig config;
    private final int bufferSize;
    final int maxBufferWatermark;
    private final int largeEventThreshold;
    private final HttpClient client;
    private final ObjectMapper jsonMapper;
    private final URL url;
    private final ConcurrentLinkedQueue<byte[]> buffersToReuse = new ConcurrentLinkedQueue();
    private final AtomicReference<Batch> concurrentBatch = new AtomicReference();
    private final ConcurrentLinkedQueue<Batch> buffersToEmit = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<byte[]> largeEventsToEmit = new ConcurrentLinkedQueue();
    private final EmittedBatchCounter emittedBatchCounter = new EmittedBatchCounter();
    private final EmittingThread emittingThread = new EmittingThread();
    private final AtomicLong totalEmittedEvents = new AtomicLong();
    private final AtomicInteger allocatedBuffers = new AtomicInteger();
    private final Object startLock = new Object();
    private final CountDownLatch startLatch = new CountDownLatch(1);
    private boolean running = false;

    public HttpPostEmitter(HttpEmitterConfig config, HttpClient client) {
        this(config, client, new ObjectMapper());
    }

    public HttpPostEmitter(HttpEmitterConfig config, HttpClient client, ObjectMapper jsonMapper) {
        this.batchingStrategy = config.getBatchingStrategy();
        int batchOverhead = this.batchingStrategy.batchStartLength() + this.batchingStrategy.batchEndLength();
        Preconditions.checkArgument((config.getMaxBatchSize() >= 1047552 + batchOverhead ? 1 : 0) != 0, (Object)String.format("maxBatchSize must be greater than MAX_EVENT_SIZE[%,d] + overhead[%,d].", 1047552, batchOverhead));
        Preconditions.checkArgument((config.getMaxBufferSize() >= 1047552L ? 1 : 0) != 0, (Object)String.format("maxBufferSize must be greater than MAX_EVENT_SIZE[%,d].", 1047552));
        this.config = config;
        this.bufferSize = config.getMaxBatchSize();
        this.maxBufferWatermark = this.bufferSize - this.batchingStrategy.batchEndLength();
        this.largeEventThreshold = (this.bufferSize - batchOverhead - this.batchingStrategy.separatorLength()) / 2;
        this.client = client;
        this.jsonMapper = jsonMapper;
        try {
            this.url = new URL(config.getRecipientBaseUrl());
        }
        catch (MalformedURLException e) {
            throw new ISE((Throwable)e, "Bad URL: %s", new Object[]{config.getRecipientBaseUrl()});
        }
        this.concurrentBatch.set(new Batch(this, this.acquireBuffer(), 0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @LifecycleStart
    public void start() {
        Object object = this.startLock;
        synchronized (object) {
            if (!this.running) {
                if (this.startLatch.getCount() == 0L) {
                    throw new IllegalStateException("Already started.");
                }
                this.running = true;
                this.startLatch.countDown();
                this.emittingThread.start();
            }
        }
    }

    private void awaitStarted() {
        try {
            if (!this.startLatch.await(1L, TimeUnit.SECONDS)) {
                throw new RejectedExecutionException("Service is not started.");
            }
            if (this.isTerminated()) {
                throw new RejectedExecutionException("Service is closed.");
            }
        }
        catch (InterruptedException e) {
            log.debug("Interrupted waiting for start", new Object[0]);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private boolean isTerminated() {
        return this.concurrentBatch.get() == null;
    }

    @Override
    public void emit(Event event) {
        this.emitAndReturnBatch(event);
    }

    @VisibleForTesting
    Batch emitAndReturnBatch(Event event) {
        Batch batch;
        this.awaitStarted();
        byte[] eventBytes = this.eventToBytes(event);
        if (eventBytes.length > 1047552) {
            log.error("Event too large to emit (%,d > %,d): %s ...", new Object[]{eventBytes.length, 1047552, StringUtils.fromUtf8((ByteBuffer)ByteBuffer.wrap(eventBytes), (int)1024)});
            return null;
        }
        if (eventBytes.length > this.largeEventThreshold) {
            this.writeLargeEvent(eventBytes);
            return null;
        }
        do {
            if ((batch = this.concurrentBatch.get()) != null) continue;
            throw new RejectedExecutionException("Service is closed.");
        } while (!batch.tryAddEvent(eventBytes));
        return batch;
    }

    private byte[] eventToBytes(Event event) {
        try {
            return this.jsonMapper.writeValueAsBytes((Object)event);
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    private void writeLargeEvent(byte[] eventBytes) {
        this.largeEventsToEmit.add(eventBytes);
        this.wakeUpEmittingThread();
    }

    void onSealExclusive(Batch batch) {
        this.buffersToEmit.add(batch);
        this.wakeUpEmittingThread();
        if (!this.isTerminated()) {
            int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(batch.batchNumber);
            if (!this.concurrentBatch.compareAndSet(batch, new Batch(this, this.acquireBuffer(), nextBatchNumber))) {
                Preconditions.checkState((boolean)this.isTerminated());
            }
        }
    }

    private void wakeUpEmittingThread() {
        LockSupport.unpark(this.emittingThread);
    }

    @Override
    public void flush() throws IOException {
        this.awaitStarted();
        this.flush(this.concurrentBatch.get());
    }

    private void flush(Batch batch) throws IOException {
        if (batch == null) {
            return;
        }
        batch.seal();
        try {
            this.emittedBatchCounter.awaitBatchEmitted(batch.batchNumber, this.config.getFlushTimeOut(), TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            String message = String.format("Timed out after [%d] millis during flushing", this.config.getFlushTimeOut());
            throw new IOException(message, e);
        }
        catch (InterruptedException e) {
            log.debug("Thread Interrupted", new Object[0]);
            Thread.currentThread().interrupt();
            throw new IOException("Thread Interrupted while flushing", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @LifecycleStop
    public void close() throws IOException {
        Object object = this.startLock;
        synchronized (object) {
            if (this.running) {
                this.running = false;
                Batch lastBatch = this.concurrentBatch.getAndSet(null);
                this.flush(lastBatch);
                this.emittingThread.shuttingDown = true;
                this.emittingThread.interrupt();
            }
        }
    }

    public String toString() {
        return "HttpPostEmitter{config=" + this.config + '}';
    }

    private byte[] acquireBuffer() {
        byte[] buffer = this.buffersToReuse.poll();
        if (buffer == null) {
            buffer = new byte[this.bufferSize];
            this.allocatedBuffers.incrementAndGet();
        }
        return buffer;
    }

    private void drainBuffersToReuse() {
        while (this.buffersToReuse.poll() != null) {
        }
    }

    @VisibleForTesting
    int getAllocatedBuffers() {
        return this.allocatedBuffers.get();
    }

    @VisibleForTesting
    long getTotalEmittedEvents() {
        return this.totalEmittedEvents.get();
    }

    @VisibleForTesting
    void waitForEmission(int batchNumber) throws Exception {
        this.emittedBatchCounter.awaitBatchEmitted(batchNumber, 10L, TimeUnit.SECONDS);
    }

    private static class FailedBuffer {
        final byte[] buffer;
        final int length;
        final int eventCount;

        private FailedBuffer(byte[] buffer, int length, int eventCount) {
            this.buffer = buffer;
            this.length = length;
            this.eventCount = eventCount;
        }
    }

    private class EmittingThread
    extends Thread {
        private final ArrayDeque<FailedBuffer> failedBuffers;
        private boolean shuttingDown;
        private ZeroCopyByteArrayOutputStream gzipBaos;

        EmittingThread() {
            super("HttpPostEmitter-" + instanceCounter.incrementAndGet());
            this.failedBuffers = new ArrayDeque();
            this.shuttingDown = false;
            this.setDaemon(true);
        }

        @Override
        public void run() {
            while (true) {
                boolean needsToShutdown = this.needsToShutdown();
                try {
                    this.emitLargeEvents();
                    this.emitBatches();
                    this.tryEmitOneFailedBuffer();
                    if (needsToShutdown) {
                        this.tryEmitAndDrainAllFailedBuffers();
                        HttpPostEmitter.this.drainBuffersToReuse();
                        return;
                    }
                }
                catch (Throwable t) {
                    log.error(t, "Uncaught exception in EmittingThread.run()", new Object[0]);
                }
                if (!this.failedBuffers.isEmpty()) continue;
                long waitNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(HttpPostEmitter.this.config.getFlushMillis()) / 2L, 1L);
                LockSupport.parkNanos(HttpPostEmitter.this, waitNanos);
            }
        }

        private boolean needsToShutdown() {
            boolean needsToShutdown;
            boolean bl = needsToShutdown = Thread.interrupted() || this.shuttingDown;
            if (needsToShutdown) {
                Batch lastBatch = HttpPostEmitter.this.concurrentBatch.getAndSet(null);
                if (lastBatch != null) {
                    lastBatch.seal();
                }
            } else {
                Batch batch = (Batch)HttpPostEmitter.this.concurrentBatch.get();
                if (batch != null) {
                    batch.sealIfFlushNeeded();
                } else {
                    needsToShutdown = true;
                }
            }
            return needsToShutdown;
        }

        private void emitBatches() {
            Batch batch;
            while ((batch = (Batch)HttpPostEmitter.this.buffersToEmit.poll()) != null) {
                this.emit(batch);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void emit(Batch batch) {
            batch.awaitEmittingAllowed();
            try {
                int bufferWatermark = batch.getSealedBufferWatermark();
                if (bufferWatermark == 0) {
                    return;
                }
                int eventCount = batch.eventCount.get();
                log.debug("Sending batch #%d to url[%s], event count[%d], bytes[%d]", new Object[]{batch.batchNumber, HttpPostEmitter.this.url, eventCount, bufferWatermark});
                int bufferEndOffset = HttpPostEmitter.this.batchingStrategy.writeBatchEnd(batch.buffer, bufferWatermark);
                if (this.sendWithRetries(batch.buffer, bufferEndOffset, eventCount)) {
                    HttpPostEmitter.this.buffersToReuse.add(batch.buffer);
                } else {
                    this.failedBuffers.add(new FailedBuffer(batch.buffer, bufferEndOffset, eventCount));
                }
            }
            finally {
                HttpPostEmitter.this.emittedBatchCounter.batchEmitted(batch.batchNumber);
            }
        }

        private void emitLargeEvents() {
            byte[] largeEvent;
            if (HttpPostEmitter.this.largeEventsToEmit.isEmpty()) {
                return;
            }
            HttpPostEmitter.this.largeEventsToEmit.add(LARGE_EVENTS_STOP);
            while ((largeEvent = (byte[])HttpPostEmitter.this.largeEventsToEmit.poll()) != LARGE_EVENTS_STOP) {
                this.emitLargeEvent(largeEvent);
            }
        }

        private void emitLargeEvent(byte[] eventBytes) {
            byte[] buffer = HttpPostEmitter.this.acquireBuffer();
            int bufferOffset = HttpPostEmitter.this.batchingStrategy.writeBatchStart(buffer);
            System.arraycopy(eventBytes, 0, buffer, bufferOffset, eventBytes.length);
            bufferOffset += eventBytes.length;
            bufferOffset = HttpPostEmitter.this.batchingStrategy.writeBatchEnd(buffer, bufferOffset);
            if (this.sendWithRetries(buffer, bufferOffset, 1)) {
                HttpPostEmitter.this.buffersToReuse.add(buffer);
            } else {
                this.failedBuffers.add(new FailedBuffer(buffer, bufferOffset, 1));
            }
        }

        private void tryEmitOneFailedBuffer() {
            FailedBuffer failedBuffer = this.failedBuffers.peek();
            if (failedBuffer != null && this.sendWithRetries(failedBuffer.buffer, failedBuffer.length, failedBuffer.eventCount)) {
                this.failedBuffers.poll();
            }
        }

        private void tryEmitAndDrainAllFailedBuffers() {
            FailedBuffer failedBuffer;
            while ((failedBuffer = this.failedBuffers.poll()) != null) {
                this.sendWithRetries(failedBuffer.buffer, failedBuffer.length, failedBuffer.eventCount);
            }
        }

        private boolean sendWithRetries(final byte[] buffer, final int length, int eventCount) {
            try {
                RetryUtils.retry((Callable)new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        EmittingThread.this.send(buffer, length);
                        return null;
                    }
                }, (Predicate)new Predicate<Throwable>(){

                    public boolean apply(Throwable input) {
                        return !(input instanceof InterruptedException);
                    }
                }, (int)3);
                HttpPostEmitter.this.totalEmittedEvents.addAndGet(eventCount);
                return true;
            }
            catch (InterruptedException e) {
                return false;
            }
            catch (Exception e) {
                log.error((Throwable)e, "Failed to send events to url[%s]", new Object[]{HttpPostEmitter.this.config.getRecipientBaseUrl()});
                return false;
            }
        }

        private void send(byte[] buffer, int length) throws Exception {
            StatusResponseHolder response;
            int payloadLength;
            byte[] payload;
            Request request;
            block19: {
                block18: {
                    request = new Request(HttpMethod.POST, HttpPostEmitter.this.url);
                    ContentEncoding contentEncoding = HttpPostEmitter.this.config.getContentEncoding();
                    if (contentEncoding == null) break block18;
                    switch (contentEncoding) {
                        case GZIP: {
                            try (GZIPOutputStream gzipOutputStream = this.acquireGzipOutputStream(length);){
                                gzipOutputStream.write(buffer, 0, length);
                            }
                            payload = this.gzipBaos.getBuffer();
                            payloadLength = this.gzipBaos.size();
                            request.setHeader("Content-Encoding", "gzip");
                            break block19;
                        }
                        default: {
                            throw new ISE("Unsupported content encoding [%s]", new Object[]{contentEncoding.name()});
                        }
                    }
                }
                payload = buffer;
                payloadLength = length;
            }
            request.setContent("application/json", payload, 0, payloadLength);
            if (HttpPostEmitter.this.config.getBasicAuthentication() != null) {
                String[] parts = HttpPostEmitter.this.config.getBasicAuthentication().split(":", 2);
                String user = parts[0];
                String password = parts.length > 1 ? parts[1] : "";
                request.setBasicAuthentication(user, password);
            }
            if ((response = (StatusResponseHolder)HttpPostEmitter.this.client.go(request, (HttpResponseHandler)new StatusResponseHandler(Charsets.UTF_8)).get()).getStatus().getCode() == 413) {
                throw new ISE("Received HTTP status 413 from [%s]. Batch size of [%d] may be too large, try adjusting maxBatchSizeBatch property", new Object[]{HttpPostEmitter.this.config.getRecipientBaseUrl(), HttpPostEmitter.this.config.getMaxBatchSize()});
            }
            if (response.getStatus().getCode() / 100 != 2) {
                throw new ISE("Emissions of events not successful[%s], with message[%s].", new Object[]{response.getStatus(), response.getContent().trim()});
            }
        }

        GZIPOutputStream acquireGzipOutputStream(int length) throws IOException {
            if (this.gzipBaos == null) {
                this.gzipBaos = new ZeroCopyByteArrayOutputStream(length);
            } else {
                this.gzipBaos.reset();
            }
            return new GZIPOutputStream((OutputStream)this.gzipBaos, true);
        }
    }
}

