/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.impl.ConcurrencyUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class DynamicBatch<T>
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
    private static final int MIN_BATCH_SIZE = 16;
    private final BlockingQueue<T> requests = new LinkedBlockingQueue<T>();
    private final BatchConsumer<T> consumer;
    private final int configuredBatchSize;
    private final int minBatchSize;
    private final int maxBatchSize;
    private final Thread thread;

    DynamicBatch(BatchConsumer<T> consumer, int batchSize, int maxUnconfirmed) {
        this.consumer = consumer;
        this.minBatchSize = batchSize < maxUnconfirmed ? Math.min(16, batchSize / 2) : Math.min(1, maxUnconfirmed / 2);
        this.configuredBatchSize = batchSize;
        this.maxBatchSize = batchSize * 2;
        this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
        this.thread.start();
    }

    void add(T item) {
        try {
            this.requests.put(item);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void loop() {
        State state = new State();
        state.batchSize = this.configuredBatchSize;
        state.items = new ArrayList(state.batchSize);
        Thread currentThread = Thread.currentThread();
        while (!currentThread.isInterrupted()) {
            T item;
            try {
                item = this.requests.poll(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                currentThread.interrupt();
                return;
            }
            if (item != null) {
                state.items.add(item);
                if (state.items.size() >= state.batchSize) {
                    this.maybeCompleteBatch(state, true);
                    continue;
                }
                this.pump(state, 2);
                continue;
            }
            this.maybeCompleteBatch(state, false);
        }
    }

    private void pump(State<T> state, int pumpCount) {
        if (pumpCount <= 0) {
            return;
        }
        Object item = this.requests.poll();
        if (item == null) {
            this.maybeCompleteBatch(state, false);
        } else {
            state.items.add(item);
            if (state.items.size() >= state.batchSize) {
                this.maybeCompleteBatch(state, true);
            }
            this.pump(state, pumpCount - 1);
        }
    }

    private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
        try {
            boolean completed = this.consumer.process(state.items);
            if (completed) {
                state.batchSize = increaseIfCompleted ? Math.min(state.batchSize * 2, this.maxBatchSize) : Math.max(state.batchSize / 2, this.minBatchSize);
                state.items = new ArrayList(state.batchSize);
            }
        }
        catch (Exception e) {
            LOGGER.warn("Error during dynamic batch completion: {}", (Object)e.getMessage());
        }
    }

    @Override
    public void close() {
        this.thread.interrupt();
    }

    @FunctionalInterface
    static interface BatchConsumer<T> {
        public boolean process(List<T> var1);
    }

    private static final class State<T> {
        int batchSize;
        List<T> items;

        private State() {
        }
    }
}

