/*
 * Decompiled with CFR 0.152.
 */
package com.pivovarit.collectors;

import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

final class Dispatcher<T> {
    private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?");
    private final CompletableFuture<Void> completionSignaller = new CompletableFuture();
    private final ExecutorService dispatcher = Dispatcher.newLazySingleThreadExecutor();
    private final Queue<CompletableFuture<T>> pending = new ConcurrentLinkedQueue<CompletableFuture<T>>();
    private final Queue<Future<Void>> cancelables = new ConcurrentLinkedQueue<Future<Void>>();
    private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<Runnable>();
    private final Executor executor;
    private final Semaphore limiter;
    private volatile boolean started = false;
    private volatile boolean shortCircuited = false;

    Dispatcher(Executor executor) {
        this(executor, Dispatcher.getDefaultParallelism());
    }

    Dispatcher(Executor executor, int permits) {
        this.executor = executor;
        this.limiter = new Semaphore(permits);
    }

    CompletableFuture<Void> start() {
        this.started = true;
        this.dispatcher.execute(this.withExceptionHandling(() -> {
            Runnable task;
            while (!Thread.currentThread().isInterrupted() && (task = this.workingQueue.take()) != POISON_PILL) {
                this.limiter.acquire();
                this.executor.execute(this.cancellable(Dispatcher.combined(task, this.limiter::release)));
            }
            this.completionSignaller.complete(null);
        }));
        return this.completionSignaller;
    }

    void stop() {
        this.workingQueue.add(POISON_PILL);
        this.dispatcher.shutdown();
    }

    boolean isRunning() {
        return this.started;
    }

    CompletableFuture<T> enqueue(Supplier<T> supplier) {
        CompletableFuture future = new CompletableFuture();
        this.pending.add(future);
        this.workingQueue.add(this.withExceptionHandling(() -> {
            if (!this.shortCircuited) {
                future.complete(supplier.get());
            }
        }));
        return future;
    }

    private Runnable withExceptionHandling(CheckedRunnable action) {
        return () -> {
            try {
                action.run();
            }
            catch (Exception e) {
                this.handle(e);
            }
            catch (Throwable e) {
                this.handle(e);
                throw e;
            }
        };
    }

    private static Runnable combined(Runnable task, Runnable finisher) {
        return () -> {
            try {
                task.run();
            }
            finally {
                finisher.run();
            }
        };
    }

    private FutureTask<Void> cancellable(Runnable task) {
        FutureTask<Object> futureTask = new FutureTask<Object>(task, null);
        this.cancelables.add(futureTask);
        return futureTask;
    }

    private void handle(Throwable e) {
        this.shortCircuited = true;
        this.completionSignaller.completeExceptionally(e);
        this.pending.forEach(future -> future.completeExceptionally(e));
        this.cancelables.forEach(future -> future.cancel(true));
        this.dispatcher.shutdownNow();
    }

    private static int getDefaultParallelism() {
        return Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
    }

    private static ThreadPoolExecutor newLazySingleThreadExecutor() {
        return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){
            private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();

            @Override
            public Thread newThread(Runnable task) {
                Thread thread = this.defaultThreadFactory.newThread(task);
                thread.setName("parallel-collector-" + thread.getName());
                thread.setDaemon(false);
                return thread;
            }
        });
    }

    @FunctionalInterface
    static interface CheckedRunnable<T extends Exception> {
        public void run() throws T;
    }
}

