/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.faulttolerance.core.bulkhead;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;
import io.smallrye.faulttolerance.core.async.CancellationEvent;
import io.smallrye.faulttolerance.core.bulkhead.BulkheadBase;
import io.smallrye.faulttolerance.core.util.NamedFutureTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolBulkhead<V>
extends BulkheadBase<Future<V>> {
    private final ExecutorService executor;
    private final Semaphore capacitySemaphore;
    private final int queueSize;

    public ThreadPoolBulkhead(FaultToleranceStrategy<Future<V>> delegate, String description, ExecutorService executor, int size, int queueSize, BulkheadBase.MetricsRecorder recorder) {
        super(description, delegate, recorder);
        this.capacitySemaphore = new Semaphore(size + queueSize);
        this.queueSize = queueSize;
        this.executor = executor;
    }

    @Override
    public Future<V> apply(InvocationContext<Future<V>> ctx) throws Exception {
        long timeEnqueued = System.nanoTime();
        if (this.capacitySemaphore.tryAcquire()) {
            BulkheadTask task = new BulkheadTask("ThreadPoolBulkhead", () -> {
                long startTime = System.nanoTime();
                this.recorder.bulkheadQueueLeft(startTime - timeEnqueued);
                this.recorder.bulkheadEntered();
                try {
                    Future future = (Future)this.delegate.apply(ctx);
                    return future;
                }
                finally {
                    this.recorder.bulkheadLeft(System.nanoTime() - startTime);
                }
            });
            ctx.registerEventHandler(CancellationEvent.class, ignored -> task.cancel());
            this.executor.execute(task);
            this.recorder.bulkheadQueueEntered();
            try {
                return (Future)task.get();
            }
            catch (InterruptedException e) {
                task.cancel(true);
                throw e;
            }
            catch (ExecutionException e) {
                throw ThreadPoolBulkhead.sneakyThrow(e.getCause());
            }
        }
        this.recorder.bulkheadRejected();
        throw this.bulkheadRejected();
    }

    int getQueueSize() {
        return Math.max(0, this.queueSize - this.capacitySemaphore.availablePermits());
    }

    private static <E extends Throwable> Exception sneakyThrow(Throwable e) throws E {
        throw e;
    }

    private class BulkheadTask
    extends NamedFutureTask<Future<V>> {
        private static final int WAITING = 0;
        private static final int RUNNING = 1;
        private static final int CANCELING = 2;
        private AtomicInteger state;

        public BulkheadTask(String name, Callable<Future<V>> callable) {
            super(name, callable);
            this.state = new AtomicInteger(0);
        }

        @Override
        public void run() {
            if (this.state.compareAndSet(0, 1)) {
                try {
                    super.run();
                }
                finally {
                    ThreadPoolBulkhead.this.capacitySemaphore.release();
                }
            }
        }

        public void cancel() {
            if (this.state.compareAndSet(0, 2)) {
                ThreadPoolBulkhead.this.capacitySemaphore.release();
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            this.cancel();
            return super.cancel(mayInterruptIfRunning);
        }
    }
}

