/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.futures;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.spotify.futures.FutureJobInvoker;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;

public final class ConcurrencyLimiter<T>
implements FutureJobInvoker<T> {
    private final Executor executor;
    private final BlockingQueue<Job<T>> queue;
    private final Semaphore limit;
    private final int maxQueueSize;
    private final int maxConcurrency;

    private ConcurrencyLimiter(Executor executor, int maxConcurrency, int maxQueueSize) {
        this.executor = executor;
        this.maxConcurrency = maxConcurrency;
        this.maxQueueSize = maxQueueSize;
        Preconditions.checkArgument((maxConcurrency > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((maxQueueSize > 0 ? 1 : 0) != 0);
        this.queue = new ArrayBlockingQueue<Job<T>>(maxQueueSize);
        this.limit = new Semaphore(maxConcurrency);
    }

    public static <T> ConcurrencyLimiter<T> create(int maxConcurrency, int maxQueueSize) {
        return new ConcurrencyLimiter<T>(MoreExecutors.directExecutor(), maxConcurrency, maxQueueSize);
    }

    public static <T> ConcurrencyLimiter<T> create(Executor executor, int maxConcurrency, int maxQueueSize) {
        return new ConcurrencyLimiter<T>(executor, maxConcurrency, maxQueueSize);
    }

    @Override
    public ListenableFuture<T> add(Callable<? extends ListenableFuture<T>> callable) {
        Preconditions.checkNotNull(callable);
        SettableFuture response = SettableFuture.create();
        Job job = new Job(callable, response);
        if (!this.queue.offer(job)) {
            String message = "Queue size has reached capacity: " + this.maxQueueSize;
            return Futures.immediateFailedFuture((Throwable)new CapacityReachedException(message));
        }
        this.executor.execute(this::pump);
        return response;
    }

    public int numQueued() {
        return this.queue.size();
    }

    public int numActive() {
        return this.maxConcurrency - this.limit.availablePermits();
    }

    public int remainingQueueCapacity() {
        return this.queue.remainingCapacity();
    }

    public int remainingActiveCapacity() {
        return this.limit.availablePermits();
    }

    private Job<T> grabJob() {
        if (!this.limit.tryAcquire()) {
            return null;
        }
        Job job = (Job)this.queue.poll();
        if (job != null) {
            return job;
        }
        this.limit.release();
        return null;
    }

    private void pump() {
        Job<T> job;
        while ((job = this.grabJob()) != null) {
            SettableFuture response = ((Job)job).response;
            if (response.isCancelled()) {
                this.limit.release();
                continue;
            }
            this.invoke(response, ((Job)job).callable);
        }
    }

    private void invoke(final SettableFuture<T> response, Callable<? extends ListenableFuture<T>> callable) {
        ListenableFuture<T> future;
        try {
            future = callable.call();
            if (future == null) {
                this.limit.release();
                response.setException((Throwable)new NullPointerException());
                return;
            }
        }
        catch (Throwable e) {
            this.limit.release();
            response.setException(e);
            return;
        }
        Futures.addCallback(future, (FutureCallback)new FutureCallback<T>(){

            public void onSuccess(T result) {
                ConcurrencyLimiter.this.limit.release();
                response.set(result);
                ConcurrencyLimiter.this.pump();
            }

            public void onFailure(Throwable t) {
                ConcurrencyLimiter.this.limit.release();
                response.setException(t);
                ConcurrencyLimiter.this.pump();
            }
        }, (Executor)this.executor);
    }

    public static class CapacityReachedException
    extends RuntimeException {
        public CapacityReachedException(String errorMessage) {
            super(errorMessage);
        }
    }

    private static class Job<T> {
        private final Callable<? extends ListenableFuture<T>> callable;
        private final SettableFuture<T> response;

        Job(Callable<? extends ListenableFuture<T>> callable, SettableFuture<T> response) {
            this.callable = callable;
            this.response = response;
        }
    }
}

