/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.faulttolerance.core.bulkhead;

import io.smallrye.faulttolerance.core.Completer;
import io.smallrye.faulttolerance.core.FaultToleranceContext;
import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.Future;
import io.smallrye.faulttolerance.core.async.FutureCancellationEvent;
import io.smallrye.faulttolerance.core.bulkhead.BulkheadEvents;
import io.smallrye.faulttolerance.core.bulkhead.BulkheadLogger;
import java.util.Deque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException;

public class Bulkhead<V>
implements FaultToleranceStrategy<V> {
    private final FaultToleranceStrategy<V> delegate;
    private final String description;
    private final Deque<BulkheadTask> queue;
    private final Semaphore capacitySemaphore;
    private final Semaphore workSemaphore;
    private final boolean syncQueueing;

    public Bulkhead(FaultToleranceStrategy<V> delegate, String description, int size, int queueSize, boolean syncQueueing) {
        this.delegate = delegate;
        this.description = description;
        this.queue = new ConcurrentLinkedDeque<BulkheadTask>();
        this.capacitySemaphore = new Semaphore(size + queueSize, true);
        this.workSemaphore = new Semaphore(size, true);
        this.syncQueueing = syncQueueing;
    }

    @Override
    public Future<V> apply(FaultToleranceContext<V> ctx) {
        BulkheadLogger.LOG.trace("Bulkhead started");
        try {
            if (ctx.isSync()) {
                if (this.syncQueueing) {
                    Future<V> future = this.applySyncWithQueueing(ctx);
                    return future;
                }
                Future<V> future = this.applySync(ctx);
                return future;
            }
            Future<V> future = this.applyAsync(ctx);
            return future;
        }
        finally {
            BulkheadLogger.LOG.trace("Bulkhead finished");
        }
    }

    private Future<V> applySync(FaultToleranceContext<V> ctx) {
        if (this.capacitySemaphore.tryAcquire()) {
            BulkheadLogger.LOG.trace("Capacity semaphore acquired, accepting task into bulkhead");
            if (this.workSemaphore.tryAcquire()) {
                BulkheadLogger.LOG.trace("Work semaphore acquired, running task");
                ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
                ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
                try {
                    Future<V> future = this.delegate.apply(ctx);
                    return future;
                }
                finally {
                    this.workSemaphore.release();
                    BulkheadLogger.LOG.trace("Work semaphore released, task finished");
                    this.capacitySemaphore.release();
                    BulkheadLogger.LOG.trace("Capacity semaphore released, task leaving bulkhead");
                    ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
                }
            }
            this.capacitySemaphore.release();
            BulkheadLogger.LOG.debugOrTrace(this.description + " invocation prevented by bulkhead", "Work semaphore not acquired, rejecting task from bulkhead");
            ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
            return Future.ofError((Throwable)new BulkheadException(this.description + " rejected from bulkhead"));
        }
        BulkheadLogger.LOG.debugOrTrace(this.description + " invocation prevented by bulkhead", "Capacity semaphore not acquired, rejecting task from bulkhead");
        ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
        return Future.ofError((Throwable)new BulkheadException(this.description + " rejected from bulkhead"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Future<V> applySyncWithQueueing(FaultToleranceContext<V> ctx) {
        if (this.capacitySemaphore.tryAcquire()) {
            BulkheadLogger.LOG.trace("Capacity semaphore acquired, accepting task into bulkhead");
            ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
            ctx.fireEvent(BulkheadEvents.StartedWaiting.INSTANCE);
            AtomicBoolean cancellationInvalid = new AtomicBoolean(false);
            AtomicBoolean cancelled = new AtomicBoolean(false);
            AtomicReference<Thread> executingThread = new AtomicReference<Thread>(Thread.currentThread());
            ctx.registerEventHandler(FutureCancellationEvent.class, event -> {
                if (cancellationInvalid.get()) {
                    return;
                }
                if (BulkheadLogger.LOG.isTraceEnabled()) {
                    BulkheadLogger.LOG.tracef("Cancelling bulkhead task,%s interrupting executing thread", event.interruptible ? "" : " NOT");
                }
                cancelled.set(true);
                if (event.interruptible) {
                    ((Thread)executingThread.get()).interrupt();
                }
            });
            try {
                this.workSemaphore.acquire();
                BulkheadLogger.LOG.trace("Work semaphore acquired, running task");
            }
            catch (InterruptedException e) {
                cancellationInvalid.set(true);
                this.capacitySemaphore.release();
                BulkheadLogger.LOG.trace("Capacity semaphore released, task leaving bulkhead");
                ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
                return Future.ofError(new CancellationException());
            }
            ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
            ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
            try {
                if (cancelled.get()) {
                    Future future = Future.ofError(new CancellationException());
                    return future;
                }
                Future<V> future = this.delegate.apply(ctx);
                return future;
            }
            finally {
                cancellationInvalid.set(true);
                this.workSemaphore.release();
                BulkheadLogger.LOG.trace("Work semaphore released, task finished");
                this.capacitySemaphore.release();
                BulkheadLogger.LOG.trace("Capacity semaphore released, task leaving bulkhead");
                ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
            }
        }
        BulkheadLogger.LOG.debugOrTrace(this.description + " invocation prevented by bulkhead", "Capacity semaphore not acquired, rejecting task from bulkhead");
        ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
        return Future.ofError((Throwable)new BulkheadException(this.description + " rejected from bulkhead"));
    }

    private Future<V> applyAsync(FaultToleranceContext<V> ctx) {
        if (this.capacitySemaphore.tryAcquire()) {
            BulkheadLogger.LOG.trace("Capacity semaphore acquired, accepting task into bulkhead");
            ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
            ctx.fireEvent(BulkheadEvents.StartedWaiting.INSTANCE);
            BulkheadTask task = new BulkheadTask(ctx);
            this.queue.addLast(task);
            this.runQueuedTask();
            return task.result.future();
        }
        BulkheadLogger.LOG.debugOrTrace(this.description + " invocation prevented by bulkhead", "Capacity semaphore not acquired, rejecting task from bulkhead");
        ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
        return Future.ofError((Throwable)new BulkheadException(this.description + " rejected from bulkhead"));
    }

    private void runQueuedTask() {
        BulkheadTask queuedTask = this.queue.pollFirst();
        if (queuedTask != null) {
            if (this.workSemaphore.tryAcquire()) {
                BulkheadLogger.LOG.trace("Work semaphore acquired, running task");
                queuedTask.run();
            } else {
                BulkheadLogger.LOG.trace("Work semaphore not acquired, putting task back to queue");
                this.queue.addFirst(queuedTask);
            }
        }
    }

    int getQueueSize() {
        return this.queue.size();
    }

    int getAvailableCapacityPermits() {
        return this.capacitySemaphore.availablePermits();
    }

    private class BulkheadTask {
        private final Completer<V> result = Completer.create();
        private final FaultToleranceContext<V> ctx;

        private BulkheadTask(FaultToleranceContext<V> ctx) {
            this.ctx = ctx;
        }

        public void run() {
            this.ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
            this.ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
            try {
                Future rawResult = Bulkhead.this.delegate.apply(this.ctx);
                rawResult.then((value, error) -> {
                    this.releaseSemaphores();
                    this.ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
                    if (error == null) {
                        this.result.complete(value);
                    } else {
                        this.result.completeWithError((Throwable)error);
                    }
                    Bulkhead.this.runQueuedTask();
                });
            }
            catch (Exception e) {
                this.releaseSemaphores();
                this.ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
                this.result.completeWithError(e);
                Bulkhead.this.runQueuedTask();
            }
        }

        private void releaseSemaphores() {
            Bulkhead.this.workSemaphore.release();
            BulkheadLogger.LOG.trace("Work semaphore released, task finished");
            Bulkhead.this.capacitySemaphore.release();
            BulkheadLogger.LOG.trace("Capacity semaphore released, task leaving bulkhead");
        }
    }
}

