/*
 * Decompiled with CFR 0.152.
 */
package io.honeycomb.libhoney.transport.batch.impl;

import io.honeycomb.libhoney.transport.batch.BatchConsumer;
import io.honeycomb.libhoney.transport.batch.BatchKeyStrategy;
import io.honeycomb.libhoney.transport.batch.Batcher;
import io.honeycomb.libhoney.transport.batch.ClockProvider;
import io.honeycomb.libhoney.utils.Assert;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBatcher<T, K>
implements Batcher<T> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultBatcher.class);
    private static final long CLEANUP_THRESHOLD = 20L;
    private static final long SHUTDOWN_TIMEOUT = 5000L;
    private final int batchSize;
    private final long batchTimeoutNanos;
    private final BlockingQueue<T> pendingQueue;
    private final Map<K, Batch> batches;
    private final ExecutorService executor;
    private final BatchConsumer<T> batchConsumer;
    private final BatchKeyStrategy<T, K> batchKeyStrategy;
    private final ClockProvider clockProvider;
    private final CountDownLatch closingLatch;
    private volatile boolean running = true;

    public DefaultBatcher(BatchKeyStrategy<T, K> batchKeyStrategy, BatchConsumer<T> batchConsumer, ClockProvider clockProvider, BlockingQueue<T> pendingQueue, int batchSize, long batchTimeoutMillis) {
        Assert.isTrue(batchSize > 0, "batchSize must be > 0");
        Assert.isTrue(batchTimeoutMillis > 0L, "batchTimeoutMillis must be > 0");
        Assert.notNull(batchKeyStrategy, "batchKeyStrategy must not be null");
        Assert.notNull(batchConsumer, "batchConsumer must not be null");
        Assert.notNull(clockProvider, "clockProvider must not be null");
        Assert.notNull(pendingQueue, "pendingQueue must not be null");
        this.batchTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(batchTimeoutMillis);
        this.pendingQueue = pendingQueue;
        this.batchConsumer = batchConsumer;
        this.batchSize = batchSize;
        this.batchKeyStrategy = batchKeyStrategy;
        this.clockProvider = clockProvider;
        this.batches = new HashMap<K, Batch>();
        this.closingLatch = new CountDownLatch(1);
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(new BatchingWorker());
    }

    private void handleNewEvent(T event) throws InterruptedException {
        K key = this.batchKeyStrategy.getKey(event);
        Batch batch = this.batches.get(key);
        if (batch == null) {
            batch = new Batch();
            this.batches.put(key, batch);
        }
        batch.add(event);
        if (batch.isFull()) {
            this.submitBatch(batch);
        }
    }

    int getCurrentlyActiveBatches() {
        return this.batches.size();
    }

    @Override
    public boolean offerEvent(T event) {
        if (!this.running) {
            return false;
        }
        boolean offer = this.pendingQueue.offer(event);
        if (!this.running) {
            try {
                this.closingLatch.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return !this.pendingQueue.contains(event);
        }
        return offer;
    }

    @Override
    public void close() {
        try {
            LOG.debug("Shutting down Batcher thread");
            this.running = false;
            this.executor.shutdownNow();
            try {
                this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ex) {
                LOG.error("Interrupted during wait for batcher to terminate", (Throwable)ex);
                Thread.currentThread().interrupt();
            }
            LOG.debug("Batcher thread shutdown complete");
        }
        finally {
            this.closingLatch.countDown();
        }
    }

    private void flush() {
        try {
            ArrayList remainingEvents = new ArrayList();
            this.pendingQueue.drainTo(remainingEvents);
            for (Object t : remainingEvents) {
                this.handleNewEvent(t);
            }
            for (Batch batch : this.batches.values()) {
                if (batch.isEmpty()) continue;
                this.submitBatch(batch);
            }
            this.batches.clear();
        }
        catch (InterruptedException ex) {
            LOG.error("Interrupt thrown during flush. Exiting flush early.", (Throwable)ex);
            Thread.currentThread().interrupt();
        }
    }

    private long getLowestTimeout() {
        long min = Long.MAX_VALUE;
        for (Batch batch : this.batches.values()) {
            if (batch.getTriggerInstant() >= min) continue;
            min = batch.getTriggerInstant();
        }
        return min - this.clockProvider.getMonotonicTime();
    }

    private void submitBatch(Batch batch) throws InterruptedException {
        List batchContents = batch.drainBatch();
        try {
            this.batchConsumer.consume(batchContents);
        }
        catch (InterruptedException ex) {
            batch.add(batchContents);
            throw ex;
        }
    }

    private void handleTimeoutTriggers() throws InterruptedException {
        Iterator<Map.Entry<K, Batch>> iterator = this.batches.entrySet().iterator();
        while (iterator.hasNext()) {
            Batch batch = iterator.next().getValue();
            if (!batch.hasReachedTriggerInstant()) continue;
            if (batch.isEmpty()) {
                batch.markNotUsed();
                if (!batch.hasReachedCleanupThreshold()) continue;
                iterator.remove();
                continue;
            }
            this.submitBatch(batch);
        }
    }

    private class BatchingWorker
    implements Runnable {
        private BatchingWorker() {
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    Object event = DefaultBatcher.this.pendingQueue.poll(DefaultBatcher.this.getLowestTimeout(), TimeUnit.NANOSECONDS);
                    if (event != null) {
                        DefaultBatcher.this.handleNewEvent(event);
                    }
                    DefaultBatcher.this.handleTimeoutTriggers();
                }
                catch (InterruptedException ignored) {
                    LOG.debug("Batcher thread interrupted. Initiating flush prior to shutdown.");
                    Thread.currentThread().interrupt();
                }
            }
            DefaultBatcher.this.flush();
        }
    }

    private class Batch {
        private final List<T> elements;
        private long triggerInstant;
        private long notUsedCounter;

        private Batch() {
            this.elements = new ArrayList(DefaultBatcher.this.batchSize);
            this.triggerInstant = this.calculateNextTriggerInstant();
        }

        private long calculateNextTriggerInstant() {
            return DefaultBatcher.this.clockProvider.getMonotonicTime() + DefaultBatcher.this.batchTimeoutNanos;
        }

        void add(T event) {
            this.elements.add(event);
        }

        void add(List<T> events) {
            this.elements.addAll(events);
        }

        boolean isFull() {
            return this.elements.size() >= DefaultBatcher.this.batchSize;
        }

        List<T> drainBatch() {
            ArrayList newList = new ArrayList(this.elements);
            this.elements.clear();
            this.triggerInstant = this.calculateNextTriggerInstant();
            return newList;
        }

        long getTriggerInstant() {
            return this.triggerInstant;
        }

        boolean isEmpty() {
            return this.elements.isEmpty();
        }

        void markNotUsed() {
            ++this.notUsedCounter;
            this.triggerInstant = this.calculateNextTriggerInstant();
        }

        boolean hasReachedCleanupThreshold() {
            return this.notUsedCounter >= 20L;
        }

        boolean hasReachedTriggerInstant() {
            return this.triggerInstant - DefaultBatcher.this.clockProvider.getMonotonicTime() <= 0L;
        }
    }
}

