/*
 * Decompiled with CFR 0.152.
 */
package com.github.phantomthief.collection.impl;

import com.github.phantomthief.collection.BufferTrigger;
import com.github.phantomthief.collection.impl.BatchConsumerTriggerBuilder;
import com.github.phantomthief.concurrent.MoreFutures;
import com.github.phantomthief.util.MoreLocks;
import com.github.phantomthief.util.ThrowableConsumer;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchConsumeBlockingQueueTrigger<E>
implements BufferTrigger<E> {
    private static final Logger logger = LoggerFactory.getLogger(BatchConsumeBlockingQueueTrigger.class);
    private final BlockingQueue<E> queue;
    private final int batchSize;
    private final ThrowableConsumer<List<E>, Exception> consumer;
    private final BiConsumer<Throwable, List<E>> exceptionHandler;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ReentrantLock lock = new ReentrantLock();
    private final AtomicBoolean running = new AtomicBoolean();
    private final Runnable shutdownExecutor;
    private volatile boolean shutdown;

    BatchConsumeBlockingQueueTrigger(BatchConsumerTriggerBuilder<E> builder) {
        Supplier<Duration> linger = builder.linger;
        this.batchSize = builder.batchSize;
        this.queue = new LinkedBlockingQueue(Integer.max(builder.bufferSize, this.batchSize));
        this.consumer = builder.consumer;
        this.exceptionHandler = builder.exceptionHandler;
        this.scheduledExecutorService = builder.scheduledExecutorService;
        Future future = MoreFutures.scheduleWithDynamicDelay((ScheduledExecutorService)this.scheduledExecutorService, linger, () -> this.doBatchConsumer(TriggerType.LINGER));
        this.shutdownExecutor = () -> {
            future.cancel(false);
            if (builder.usingInnerExecutor) {
                MoreExecutors.shutdownAndAwaitTermination((ExecutorService)builder.scheduledExecutorService, (long)1L, (TimeUnit)TimeUnit.DAYS);
            }
        };
    }

    @Deprecated
    public static BatchConsumerTriggerBuilder<Object> newBuilder() {
        return new BatchConsumerTriggerBuilder<Object>();
    }

    @Override
    public void enqueue(E element) {
        Preconditions.checkState((!this.shutdown ? 1 : 0) != 0, (Object)"buffer trigger was shutdown.");
        Uninterruptibles.putUninterruptibly(this.queue, element);
        this.tryTrigBatchConsume();
    }

    private void tryTrigBatchConsume() {
        if (this.queue.size() >= this.batchSize) {
            MoreLocks.runWithTryLock((Lock)this.lock, () -> {
                if (this.queue.size() >= this.batchSize && !this.running.get()) {
                    this.scheduledExecutorService.execute(() -> this.doBatchConsumer(TriggerType.ENQUEUE));
                    this.running.set(true);
                }
            });
        }
    }

    @Override
    public void manuallyDoTrigger() {
        this.doBatchConsumer(TriggerType.MANUALLY);
    }

    private void doBatchConsumer(TriggerType type) {
        MoreLocks.runWithLock((Lock)this.lock, () -> {
            try {
                this.running.set(true);
                int queueSizeBeforeConsumer = this.queue.size();
                int consumedSize = 0;
                while (!this.queue.isEmpty()) {
                    if (this.queue.size() < this.batchSize) {
                        if (type == TriggerType.ENQUEUE) {
                            return;
                        }
                        if (type == TriggerType.LINGER && consumedSize >= queueSizeBeforeConsumer) {
                            return;
                        }
                    }
                    ArrayList toConsumeData = new ArrayList(Math.min(this.batchSize, this.queue.size()));
                    this.queue.drainTo(toConsumeData, this.batchSize);
                    if (toConsumeData.isEmpty()) continue;
                    if (logger.isDebugEnabled()) {
                        logger.debug("do batch consumer:{}, size:{}", (Object)type, (Object)toConsumeData.size());
                    }
                    consumedSize += toConsumeData.size();
                    this.doConsume(toConsumeData);
                }
            }
            finally {
                this.running.set(false);
            }
        });
    }

    private void doConsume(List<E> toConsumeData) {
        try {
            this.consumer.accept(toConsumeData);
        }
        catch (Throwable e) {
            if (this.exceptionHandler != null) {
                try {
                    this.exceptionHandler.accept(e, toConsumeData);
                }
                catch (Throwable ex) {
                    logger.error("", ex);
                }
            }
            logger.error("Ops.", e);
        }
    }

    @Override
    public long getPendingChanges() {
        return this.queue.size();
    }

    @Override
    public void close() {
        this.shutdown = true;
        try {
            this.manuallyDoTrigger();
        }
        finally {
            this.shutdownExecutor.run();
        }
    }

    private static enum TriggerType {
        LINGER,
        ENQUEUE,
        MANUALLY;

    }
}

