/*
 * Decompiled with CFR 0.152.
 */
package com.github.lhotari.reactive.pulsar.internal.adapter;

import com.github.lhotari.reactive.pulsar.resourceadapter.PublisherTransformer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jctools.queues.MpmcArrayQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

class InflightLimiter
implements PublisherTransformer {
    private static final Logger LOG = LoggerFactory.getLogger(InflightLimiter.class);
    public static final int DEFAULT_MAX_PENDING_SUBSCRIPTIONS = 1024;
    private final MpmcArrayQueue<InflightLimiterSubscriber<?>> pendingSubscriptions;
    private final AtomicInteger inflight = new AtomicInteger();
    private final AtomicInteger activeSubscriptions = new AtomicInteger();
    private final int maxInflight;
    private final int expectedSubscriptionsInflight;
    private final Scheduler.Worker triggerNextWorker;

    public InflightLimiter(int maxInflight) {
        this(maxInflight, maxInflight, Schedulers.single(), 1024);
    }

    public InflightLimiter(int maxInflight, int expectedSubscriptionsInflight, Scheduler triggerNextScheduler, int maxPendingSubscriptions) {
        this.maxInflight = maxInflight;
        this.expectedSubscriptionsInflight = expectedSubscriptionsInflight;
        this.triggerNextWorker = triggerNextScheduler.createWorker();
        if (expectedSubscriptionsInflight > maxInflight) {
            throw new IllegalArgumentException("maxSubscriptionInflight must be equal or less than maxInflight.");
        }
        this.pendingSubscriptions = new MpmcArrayQueue(maxPendingSubscriptions);
    }

    @Override
    public <T> Publisher<T> transform(Publisher<T> publisher) {
        if (publisher instanceof Mono) {
            return this.createOperator((Mono)Mono.class.cast(publisher));
        }
        return this.createOperator(Flux.from(publisher));
    }

    public <I> Flux<I> createOperator(Flux<I> source) {
        return new FluxOperator<I, I>(source){

            public void subscribe(CoreSubscriber<? super I> actual) {
                InflightLimiter.this.handleSubscribe(this.source, actual);
            }
        };
    }

    public <I> Mono<I> createOperator(Mono<I> source) {
        return new MonoOperator<I, I>(source){

            public void subscribe(CoreSubscriber<? super I> actual) {
                InflightLimiter.this.handleSubscribe(this.source, actual);
            }
        };
    }

    <I> void handleSubscribe(Publisher<I> source, CoreSubscriber<? super I> actual) {
        this.activeSubscriptions.incrementAndGet();
        InflightLimiterSubscriber<I> subscriber = new InflightLimiterSubscriber<I>(actual);
        source.subscribe(subscriber);
        actual.onSubscribe(subscriber.getSubscription());
    }

    void maybeTriggerNext() {
        if (!this.triggerNextWorker.isDisposed()) {
            this.triggerNextWorker.schedule(() -> {
                InflightLimiterSubscriber subscriber;
                int remainingSubscriptions = this.pendingSubscriptions.size();
                while (this.inflight.get() < this.maxInflight && remainingSubscriptions-- > 0 && (subscriber = (InflightLimiterSubscriber)((Object)((Object)this.pendingSubscriptions.poll()))) != null) {
                    if (subscriber.isDisposed()) continue;
                    subscriber.requestMore();
                }
            });
        }
    }

    public void dispose() {
        this.triggerNextWorker.dispose();
        this.pendingSubscriptions.drain(BaseSubscriber::cancel);
    }

    public boolean isDisposed() {
        return this.triggerNextWorker.isDisposed();
    }

    private class InflightLimiterSubscriber<I>
    extends BaseSubscriber<I> {
        private final CoreSubscriber<? super I> actual;
        private AtomicLong requestedDemand = new AtomicLong();
        private final Subscription subscription = new Subscription(){

            public void request(long n) {
                InflightLimiterSubscriber.this.requestedDemand.addAndGet(n);
                InflightLimiterSubscriber.this.maybeAddToPending();
                InflightLimiter.this.maybeTriggerNext();
            }

            public void cancel() {
                InflightLimiterSubscriber.this.cancel();
            }
        };
        private AtomicInteger inflightForSubscription = new AtomicInteger();

        public InflightLimiterSubscriber(CoreSubscriber<? super I> actual) {
            this.actual = actual;
        }

        public Context currentContext() {
            return this.actual.currentContext();
        }

        protected void hookOnSubscribe(Subscription subscription) {
        }

        protected void hookOnNext(I value) {
            this.actual.onNext(value);
            InflightLimiter.this.inflight.decrementAndGet();
            this.inflightForSubscription.decrementAndGet();
            this.maybeAddToPending();
            InflightLimiter.this.maybeTriggerNext();
        }

        protected void hookOnComplete() {
            InflightLimiter.this.activeSubscriptions.decrementAndGet();
            this.actual.onComplete();
            this.clearInflight();
            InflightLimiter.this.maybeTriggerNext();
        }

        private void clearInflight() {
            InflightLimiter.this.inflight.addAndGet(-this.inflightForSubscription.getAndSet(0));
        }

        protected void hookOnError(Throwable throwable) {
            InflightLimiter.this.activeSubscriptions.decrementAndGet();
            this.actual.onError(throwable);
            this.clearInflight();
            InflightLimiter.this.maybeTriggerNext();
        }

        protected void hookOnCancel() {
            InflightLimiter.this.activeSubscriptions.decrementAndGet();
            this.clearInflight();
            this.requestedDemand.set(0L);
            InflightLimiter.this.maybeTriggerNext();
        }

        public Subscription getSubscription() {
            return this.subscription;
        }

        void requestMore() {
            if (this.requestedDemand.get() > 0L && this.inflightForSubscription.get() <= InflightLimiter.this.expectedSubscriptionsInflight / 2 && InflightLimiter.this.inflight.get() < InflightLimiter.this.maxInflight) {
                long maxRequest = Math.max(Math.min(Math.min(Math.min(this.requestedDemand.get(), (long)(InflightLimiter.this.maxInflight - InflightLimiter.this.inflight.get())), (long)(InflightLimiter.this.expectedSubscriptionsInflight - this.inflightForSubscription.get())), (long)(InflightLimiter.this.maxInflight / InflightLimiter.this.activeSubscriptions.get())), 1L);
                InflightLimiter.this.inflight.addAndGet((int)maxRequest);
                this.requestedDemand.addAndGet(-maxRequest);
                this.inflightForSubscription.addAndGet((int)maxRequest);
                this.request(maxRequest);
            } else {
                this.maybeAddToPending();
            }
        }

        void maybeAddToPending() {
            if (this.requestedDemand.get() > 0L && !this.isDisposed() && this.inflightForSubscription.get() == 0) {
                InflightLimiter.this.pendingSubscriptions.add((Object)this);
            }
        }
    }
}

