/*
 * Decompiled with CFR 0.152.
 */
package com.github.phantomthief.collection.impl;

import com.github.phantomthief.collection.BufferTrigger;
import com.github.phantomthief.collection.impl.BatchConsumerTriggerBuilder;
import com.github.phantomthief.util.MoreLocks;
import com.github.phantomthief.util.ThrowableConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchConsumeBlockingQueueTrigger<E>
implements BufferTrigger<E> {
    private static final Logger logger = LoggerFactory.getLogger(BatchConsumeBlockingQueueTrigger.class);
    private final BlockingQueue<E> queue;
    private final int batchSize;
    private final long lingerMs;
    private final ThrowableConsumer<List<E>, Exception> consumer;
    private final BiConsumer<Throwable, List<E>> exceptionHandler;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ReentrantLock lock = new ReentrantLock();
    private final AtomicBoolean running = new AtomicBoolean();

    BatchConsumeBlockingQueueTrigger(long lingerMs, int batchSize, int bufferSize, BiConsumer<Throwable, List<E>> exceptionHandler, ThrowableConsumer<List<E>, Exception> consumer, ScheduledExecutorService scheduledExecutorService) {
        this.lingerMs = lingerMs;
        this.batchSize = batchSize;
        this.queue = new LinkedBlockingQueue(Integer.max(bufferSize, batchSize));
        this.consumer = consumer;
        this.exceptionHandler = exceptionHandler;
        this.scheduledExecutorService = scheduledExecutorService;
        this.scheduledExecutorService.schedule(new BatchConsumerRunnable(), this.lingerMs, TimeUnit.MILLISECONDS);
    }

    @Deprecated
    public static BatchConsumerTriggerBuilder<Object> newBuilder() {
        return new BatchConsumerTriggerBuilder<Object>();
    }

    @Override
    public void enqueue(E element) {
        try {
            this.queue.put(element);
            this.tryTrigBatchConsume();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void tryTrigBatchConsume() {
        if (this.queue.size() >= this.batchSize) {
            MoreLocks.runWithTryLock((Lock)this.lock, () -> {
                if (this.queue.size() >= this.batchSize && !this.running.get()) {
                    this.scheduledExecutorService.execute(this::doBatchConsumer);
                    this.running.set(true);
                }
            });
        }
    }

    @Override
    public void manuallyDoTrigger() {
        this.doBatchConsumer();
    }

    private void doBatchConsumer() {
        MoreLocks.runWithLock((Lock)this.lock, () -> {
            try {
                this.running.set(true);
                while (!this.queue.isEmpty()) {
                    ArrayList toConsumeData = new ArrayList(Math.min(this.batchSize, this.queue.size()));
                    this.queue.drainTo(toConsumeData, this.batchSize);
                    if (toConsumeData.isEmpty()) continue;
                    this.doConsume(toConsumeData);
                }
            }
            finally {
                this.running.set(false);
            }
        });
    }

    private void doConsume(List<E> toConsumeData) {
        try {
            this.consumer.accept(toConsumeData);
        }
        catch (Throwable e) {
            if (this.exceptionHandler != null) {
                try {
                    this.exceptionHandler.accept(e, toConsumeData);
                }
                catch (Throwable ex) {
                    logger.error("", ex);
                }
            }
            logger.error("Ops.", e);
        }
    }

    @Override
    public long getPendingChanges() {
        return this.queue.size();
    }

    private class BatchConsumerRunnable
    implements Runnable {
        private BatchConsumerRunnable() {
        }

        @Override
        public void run() {
            try {
                BatchConsumeBlockingQueueTrigger.this.doBatchConsumer();
            }
            finally {
                BatchConsumeBlockingQueueTrigger.this.scheduledExecutorService.schedule(this, BatchConsumeBlockingQueueTrigger.this.lingerMs, TimeUnit.MILLISECONDS);
            }
        }
    }
}

