/*
 * Decompiled with CFR 0.152.
 */
package com.swirlds.common.threading.framework.internal;

import com.swirlds.base.utility.ToStringBuilder;
import com.swirlds.common.threading.framework.QueueThread;
import com.swirlds.common.threading.framework.Stoppable;
import com.swirlds.common.threading.framework.StoppableThread;
import com.swirlds.common.threading.framework.ThreadSeed;
import com.swirlds.common.threading.framework.internal.AbstractBlockingQueue;
import com.swirlds.common.threading.framework.internal.AbstractQueueThreadConfiguration;
import com.swirlds.common.threading.framework.internal.QueueThreadMetrics;
import com.swirlds.common.threading.framework.internal.StoppableThreadImpl;
import com.swirlds.common.threading.framework.internal.ThreadBuildingUtils;
import com.swirlds.common.threading.interrupt.InterruptableConsumer;
import com.swirlds.common.threading.interrupt.InterruptableRunnable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class QueueThreadImpl<T>
extends AbstractBlockingQueue<T>
implements QueueThread<T> {
    private final int bufferSize;
    private final List<T> buffer;
    private final InterruptableConsumer<T> handler;
    private final StoppableThread stoppableThread;
    private final AbstractQueueThreadConfiguration<?, T> configuration;
    private final AtomicLong noWorkCount = new AtomicLong();
    private final QueueThreadMetrics metrics;
    private final InterruptableRunnable idleCallback;
    private final InterruptableRunnable batchHandledCallback;
    private final Duration waitForWorkDuration;

    public QueueThreadImpl(AbstractQueueThreadConfiguration<?, T> configuration) {
        super(ThreadBuildingUtils.getOrBuildQueue(configuration));
        this.configuration = configuration;
        int capacity = configuration.getCapacity();
        this.bufferSize = capacity > 0 ? Math.min(capacity, configuration.getMaxBufferSize()) : configuration.getMaxBufferSize();
        this.buffer = new ArrayList<T>(this.bufferSize);
        this.handler = configuration.getHandler();
        this.idleCallback = configuration.getIdleCallback();
        this.batchHandledCallback = configuration.getBatchHandledCallback();
        this.waitForWorkDuration = configuration.getWaitForWorkDuration();
        this.metrics = new QueueThreadMetrics(configuration);
        this.stoppableThread = ((AbstractQueueThreadConfiguration)((AbstractQueueThreadConfiguration)((AbstractQueueThreadConfiguration)configuration.setWork(this::doWork)).setStopBehavior(configuration.getStopBehavior())).setFinalCycleWork(this::doFinalCycleWork)).buildStoppableThread(false);
    }

    @Override
    public ThreadSeed buildSeed() {
        if (((StoppableThreadImpl)this.stoppableThread).hasBeenStartedOrInjected()) {
            throw new IllegalStateException("can not build seed for thread if it has already built a seed or if it has already been started");
        }
        return this.configuration.buildStoppableThreadSeed((StoppableThreadImpl)this.stoppableThread);
    }

    @Override
    public void start() {
        if (((StoppableThreadImpl)this.stoppableThread).hasBeenStartedOrInjected()) {
            throw new IllegalStateException("can not start thread if it has already built a seed or if it has already been started");
        }
        this.metrics.startingWork();
        this.stoppableThread.start();
    }

    @Override
    public boolean pause() {
        return this.stoppableThread.pause();
    }

    @Override
    public boolean resume() {
        return this.stoppableThread.resume();
    }

    @Override
    public void join() throws InterruptedException {
        this.stoppableThread.join();
    }

    @Override
    public void join(long millis) throws InterruptedException {
        this.stoppableThread.join(millis);
    }

    @Override
    public void join(long millis, int nanos) throws InterruptedException {
        this.stoppableThread.join(millis, nanos);
    }

    @Override
    public boolean stop() {
        return this.stoppableThread.stop();
    }

    @Override
    public boolean stop(Stoppable.StopBehavior behavior) {
        return this.stoppableThread.stop(behavior);
    }

    @Override
    public boolean interrupt() {
        return this.stoppableThread.interrupt();
    }

    @Override
    public boolean isAlive() {
        return this.stoppableThread.isAlive();
    }

    @Override
    public StoppableThread.Status getStatus() {
        return this.stoppableThread.getStatus();
    }

    @Override
    public boolean isHanging() {
        return this.stoppableThread.isHanging();
    }

    private void doWork() throws InterruptedException {
        this.drainTo(this.buffer, this.bufferSize);
        if (this.buffer.size() == 0) {
            this.metrics.finishedWork();
            T item = this.waitForItem();
            this.metrics.startingWork();
            if (item != null) {
                this.handler.accept(item);
                this.batchHandled();
            }
            return;
        }
        for (T item : this.buffer) {
            this.handler.accept(item);
        }
        this.buffer.clear();
        this.batchHandled();
    }

    private void batchHandled() throws InterruptedException {
        if (this.batchHandledCallback != null) {
            this.batchHandledCallback.run();
        }
    }

    private T waitForItem() throws InterruptedException {
        Object item = this.poll(this.waitForWorkDuration.toNanos(), TimeUnit.NANOSECONDS);
        if (item == null) {
            this.noWorkCount.incrementAndGet();
            if (this.idleCallback != null) {
                this.metrics.startingWork();
                this.idleCallback.run();
                this.metrics.finishedWork();
            }
        }
        return item;
    }

    @Override
    public void waitUntilNotBusy() throws InterruptedException {
        long initialCount = this.noWorkCount.get();
        while (this.noWorkCount.get() <= initialCount + 1L && this.getStatus() != StoppableThread.Status.DEAD) {
            TimeUnit.NANOSECONDS.sleep(this.waitForWorkDuration.toNanos());
        }
    }

    private void doFinalCycleWork() throws InterruptedException {
        while (!this.isEmpty()) {
            this.drainTo(this.buffer, this.bufferSize);
            for (T item : this.buffer) {
                this.handler.accept(item);
            }
            this.buffer.clear();
        }
    }

    @Override
    public void clear() {
        if (this.stoppableThread.getStatus() != StoppableThread.Status.ALIVE) {
            super.clear();
            return;
        }
        this.pause();
        super.clear();
        this.resume();
    }

    @Override
    public String getName() {
        return this.stoppableThread.getName();
    }

    public String toString() {
        return new ToStringBuilder((Object)this).append((Object)this.getName()).toString();
    }
}

