/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import ratpack.exec.Promise;
import ratpack.exec.Throttle;

public class DefaultThrottle
implements Throttle {
    private final int size;
    private final Queue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
    private final AtomicInteger active = new AtomicInteger();
    private final AtomicInteger waiting = new AtomicInteger();

    public DefaultThrottle(int size) {
        if (size < 1) {
            throw new IllegalArgumentException("throttle size must be greater than 1");
        }
        this.size = size;
    }

    @Override
    public <T> Promise<T> throttle(Promise<T> promise) {
        return promise.defer(r -> {
            this.waiting.incrementAndGet();
            this.queue.add((Runnable)r);
            this.drain();
        }).wiretap(r -> {
            this.active.decrementAndGet();
            this.drain();
        });
    }

    @Override
    public int getSize() {
        return this.size;
    }

    @Override
    public int getActive() {
        return this.active.get();
    }

    @Override
    public int getWaiting() {
        return this.waiting.get();
    }

    private void drain() {
        if (!this.queue.isEmpty()) {
            int i = this.active.getAndIncrement();
            if (i < this.size) {
                Runnable job = this.queue.poll();
                if (job == null) {
                    this.active.decrementAndGet();
                } else {
                    this.waiting.decrementAndGet();
                    job.run();
                }
            } else {
                i = this.active.decrementAndGet();
                if (i < this.size) {
                    this.drain();
                }
            }
        }
    }
}

