/*
 * Decompiled with CFR 0.152.
 */
package conseq4j.execute;

import conseq4j.Terminable;
import conseq4j.execute.SequentialExecutor;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import lombok.NonNull;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;

@ThreadSafe
public final class ConseqExecutor
implements SequentialExecutor,
Terminable,
AutoCloseable {
    private static final int DEFAULT_CONCURRENCY = Runtime.getRuntime().availableProcessors();
    private final Map<Object, CompletableFuture<?>> activeSequentialTasks = new ConcurrentHashMap();
    private final ExecutorService adminService = Executors.newSingleThreadExecutor();
    private final ExecutorService workerExecutorService;

    private ConseqExecutor(ExecutorService workerExecutorService) {
        this.workerExecutorService = workerExecutorService;
    }

    @Nonnull
    public static ConseqExecutor instance() {
        return ConseqExecutor.instance(DEFAULT_CONCURRENCY);
    }

    @Nonnull
    public static ConseqExecutor instance(int concurrency) {
        return ConseqExecutor.instance(new ForkJoinPool(concurrency, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true));
    }

    @Nonnull
    public static ConseqExecutor instance(ExecutorService workerExecutorService) {
        return new ConseqExecutor(workerExecutorService);
    }

    private static <T> T call(Callable<T> task) {
        try {
            return task.call();
        }
        catch (Exception e) {
            throw new CompletionException(e);
        }
    }

    private static ConditionFactory awaitForever() {
        return Awaitility.await().forever().pollDelay(Duration.ofMillis(10L));
    }

    public CompletableFuture<Void> execute(@NonNull Runnable command, @NonNull Object sequenceKey) {
        if (command == null) {
            throw new NullPointerException("command is marked non-null but is null");
        }
        if (sequenceKey == null) {
            throw new NullPointerException("sequenceKey is marked non-null but is null");
        }
        return this.submit(Executors.callable(command, null), sequenceKey);
    }

    public <T> CompletableFuture<T> submit(@NonNull Callable<T> task, @NonNull Object sequenceKey) {
        if (task == null) {
            throw new NullPointerException("task is marked non-null but is null");
        }
        if (sequenceKey == null) {
            throw new NullPointerException("sequenceKey is marked non-null but is null");
        }
        CompletableFuture latestTask = this.activeSequentialTasks.compute(sequenceKey, (k, presentTask) -> presentTask == null ? CompletableFuture.supplyAsync(() -> ConseqExecutor.call(task), this.workerExecutorService) : presentTask.handleAsync((r, e) -> ConseqExecutor.call(task), (Executor)this.workerExecutorService));
        CompletionStage copy = latestTask.thenApply(result -> result);
        latestTask.whenCompleteAsync((r, e) -> this.activeSequentialTasks.computeIfPresent(sequenceKey, (k, checkedTask) -> checkedTask.isDone() ? null : checkedTask), (Executor)this.adminService);
        return copy;
    }

    @Override
    public void close() {
        ConseqExecutor.awaitForever().until(this::noTaskPending);
        this.terminate();
        ConseqExecutor.awaitForever().until(this::isTerminated);
    }

    boolean noTaskPending() {
        return this.activeSequentialTasks.isEmpty();
    }

    @Override
    public void terminate() {
        new Thread(() -> {
            this.workerExecutorService.shutdown();
            ConseqExecutor.awaitForever().until(this::noTaskPending);
            this.adminService.shutdown();
        }).start();
    }

    @Override
    public boolean isTerminated() {
        return this.workerExecutorService.isTerminated() && this.adminService.isTerminated();
    }

    @Override
    @Nonnull
    public List<Runnable> terminateNow() {
        List<Runnable> neverStartedTasks = this.workerExecutorService.shutdownNow();
        this.adminService.shutdownNow();
        return neverStartedTasks;
    }

    public String toString() {
        return "ConseqExecutor(activeSequentialTasks=" + this.activeSequentialTasks + ", adminService=" + this.adminService + ", workerExecutorService=" + this.workerExecutorService + ")";
    }
}

