/*
 * Decompiled with CFR 0.152.
 */
package com.swirlds.common.wiring.schedulers.internal;

import com.swirlds.base.state.Startable;
import com.swirlds.base.state.Stoppable;
import com.swirlds.common.metrics.extensions.FractionalTimer;
import com.swirlds.common.wiring.counters.ObjectCounter;
import com.swirlds.common.wiring.model.internal.StandardWiringModel;
import com.swirlds.common.wiring.schedulers.TaskScheduler;
import com.swirlds.common.wiring.schedulers.builders.TaskSchedulerType;
import com.swirlds.common.wiring.schedulers.internal.SequentialThreadTask;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class SequentialThreadTaskScheduler<OUT>
extends TaskScheduler<OUT>
implements Startable,
Stoppable {
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
    private final ObjectCounter onRamp;
    private final ObjectCounter offRamp;
    private final FractionalTimer busyTimer;
    private final Duration sleepDuration;
    private final BlockingQueue<SequentialThreadTask> tasks = new LinkedBlockingQueue<SequentialThreadTask>();
    private static final int BUFFER_SIZE = 1024;
    private final AtomicBoolean alive = new AtomicBoolean(true);
    private final Thread thread;

    public SequentialThreadTaskScheduler(@NonNull StandardWiringModel model, @NonNull String name, @NonNull Thread.UncaughtExceptionHandler uncaughtExceptionHandler, @NonNull ObjectCounter onRamp, @NonNull ObjectCounter offRamp, @NonNull FractionalTimer busyTimer, @NonNull Duration sleepDuration, boolean flushEnabled, boolean insertionIsBlocking) {
        super(model, name, TaskSchedulerType.SEQUENTIAL_THREAD, flushEnabled, insertionIsBlocking);
        this.uncaughtExceptionHandler = Objects.requireNonNull(uncaughtExceptionHandler);
        this.onRamp = Objects.requireNonNull(onRamp);
        this.offRamp = Objects.requireNonNull(offRamp);
        this.busyTimer = Objects.requireNonNull(busyTimer);
        this.sleepDuration = Objects.requireNonNull(sleepDuration);
        this.thread = new Thread(this::run, "<scheduler " + name + ">");
    }

    @Override
    public long getUnprocessedTaskCount() {
        return this.onRamp.getCount();
    }

    @Override
    public void flush() {
        this.throwIfFlushDisabled();
        this.onRamp.forceOnRamp();
        Semaphore semaphore = new Semaphore(0);
        this.tasks.add(new SequentialThreadTask(x -> semaphore.release(), semaphore));
        semaphore.acquireUninterruptibly();
    }

    @Override
    protected void put(@NonNull Consumer<Object> handler, @NonNull Object data) {
        this.onRamp.onRamp();
        this.tasks.add(new SequentialThreadTask(handler, data));
    }

    @Override
    protected boolean offer(@NonNull Consumer<Object> handler, @NonNull Object data) {
        boolean accepted = this.onRamp.attemptOnRamp();
        if (!accepted) {
            return false;
        }
        this.tasks.add(new SequentialThreadTask(handler, data));
        return true;
    }

    @Override
    protected void inject(@NonNull Consumer<Object> handler, @NonNull Object data) {
        this.onRamp.forceOnRamp();
        this.tasks.add(new SequentialThreadTask(handler, data));
    }

    public void start() {
        this.thread.start();
    }

    public void stop() {
        this.alive.set(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void run() {
        ArrayList<SequentialThreadTask> buffer = new ArrayList<SequentialThreadTask>(1024);
        while (this.alive.get()) {
            this.tasks.drainTo(buffer, 1024);
            if (buffer.isEmpty()) {
                if (this.sleepDuration.toNanos() <= 0L) continue;
                try {
                    SequentialThreadTask task = this.tasks.poll(this.sleepDuration.toNanos(), TimeUnit.NANOSECONDS);
                    if (task == null) continue;
                    buffer.add(task);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            this.busyTimer.activate();
            for (SequentialThreadTask task : buffer) {
                try {
                    task.handle();
                }
                catch (Throwable t) {
                    this.uncaughtExceptionHandler.uncaughtException(this.thread, t);
                }
                finally {
                    this.offRamp.offRamp();
                }
            }
            this.busyTimer.deactivate();
            buffer.clear();
        }
    }
}

