/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import ratpack.func.Function;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.YieldRequest;
import ratpack.stream.internal.DefaultYieldRequest;
import ratpack.stream.internal.SubscriptionSupport;

public class YieldingPublisher<T>
implements TransformablePublisher<T> {
    private final Function<? super YieldRequest, T> producer;
    private final AtomicLong subscriptionCounter = new AtomicLong();

    public YieldingPublisher(Function<? super YieldRequest, T> producer) {
        this.producer = producer;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        new Subscription(subscriber);
    }

    private class Subscription
    extends SubscriptionSupport<T> {
        private final long subscriptionNum;
        private final AtomicInteger requestCounter;

        public Subscription(Subscriber<? super T> subscriber) {
            super(subscriber);
            this.subscriptionNum = YieldingPublisher.this.subscriptionCounter.getAndIncrement();
            this.requestCounter = new AtomicInteger();
            this.start();
        }

        @Override
        protected void doRequest(long n) {
            long i = 0L;
            while (i++ < n) {
                Object produced;
                if (this.isStopped()) {
                    return;
                }
                try {
                    produced = YieldingPublisher.this.producer.apply(new DefaultYieldRequest(this.subscriptionNum, this.requestCounter.getAndIncrement()));
                }
                catch (Throwable e) {
                    this.onError(e);
                    return;
                }
                if (produced == null) {
                    this.onComplete();
                    return;
                }
                this.onNext(produced);
            }
        }
    }
}

