/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.internal.BufferingPublisher;
import ratpack.stream.internal.SubscriptionSupport;

public class BufferingPublisher<T>
implements TransformablePublisher<T> {
    private final Publisher<? extends T> publisher;

    public BufferingPublisher(Publisher<? extends T> publisher) {
        this.publisher = publisher;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        new Subscription(subscriber);
    }

    private class Subscription
    extends SubscriptionSupport<T> {
        private ratpack.stream.internal.BufferingPublisher$Subscription.BufferingSubscriber bufferingSubscriber;
        private final AtomicBoolean upstreamFinished;
        private final AtomicReference<org.reactivestreams.Subscription> upstreamSubscription;
        private final AtomicBoolean requestedUpstream;

        public Subscription(Subscriber<? super T> subscriber) {
            super(subscriber);
            this.upstreamFinished = new AtomicBoolean();
            this.upstreamSubscription = new AtomicReference();
            this.requestedUpstream = new AtomicBoolean();
            this.start();
        }

        @Override
        protected void doRequest(long n) {
            if (!this.isStopped()) {
                if (this.requestedUpstream.compareAndSet(false, true)) {
                    if (n == Long.MAX_VALUE) {
                        BufferingPublisher.this.publisher.subscribe((Subscriber)new PassThruSubscriber());
                    } else {
                        this.bufferingSubscriber = new BufferingSubscriber();
                        BufferingPublisher.this.publisher.subscribe((Subscriber)this.bufferingSubscriber);
                        ((BufferingSubscriber)this.bufferingSubscriber).wanted.addAndGet(n);
                        this.bufferingSubscriber.tryDrain();
                    }
                } else if (this.bufferingSubscriber != null && !((BufferingSubscriber)this.bufferingSubscriber).open.get()) {
                    if (((BufferingSubscriber)this.bufferingSubscriber).wanted.addAndGet(n) >= 0L) {
                        ((BufferingSubscriber)this.bufferingSubscriber).open.set(true);
                    }
                    this.bufferingSubscriber.tryDrain();
                }
            }
        }

        @Override
        protected void doCancel() {
            org.reactivestreams.Subscription subscription = this.upstreamSubscription.getAndSet(null);
            if (subscription != null) {
                subscription.cancel();
            }
            if (this.bufferingSubscriber != null) {
                ((BufferingSubscriber)this.bufferingSubscriber).wanted.set(Long.MIN_VALUE);
                this.bufferingSubscriber = null;
            }
        }

        class BufferingSubscriber
        implements Subscriber<T> {
            private final AtomicLong wanted = new AtomicLong(Long.MIN_VALUE);
            private final AtomicBoolean open = new AtomicBoolean();
            private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue();
            private final AtomicBoolean draining = new AtomicBoolean();

            BufferingSubscriber() {
            }

            public void onSubscribe(org.reactivestreams.Subscription s) {
                if (Subscription.this.isStopped()) {
                    s.cancel();
                }
                Subscription.this.upstreamSubscription.set(s);
                s.request(Long.MAX_VALUE);
            }

            public void onNext(T t) {
                this.buffer.add(t);
                this.tryDrain();
            }

            public void onError(Throwable t) {
                this.buffer.clear();
                Subscription.this.upstreamFinished.set(true);
                Subscription.this.onError(t);
            }

            public void onComplete() {
                Subscription.this.upstreamFinished.set(true);
                this.tryDrain();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void tryDrain() {
                if (this.draining.compareAndSet(false, true)) {
                    try {
                        long i = this.wanted.get();
                        while (this.open.get() || i > Long.MIN_VALUE) {
                            Object item = this.buffer.poll();
                            if (item == null) {
                                if (Subscription.this.upstreamFinished.get()) {
                                    Subscription.this.onComplete();
                                    return;
                                }
                                break;
                            }
                            Subscription.this.onNext(item);
                            i = this.wanted.decrementAndGet();
                        }
                    }
                    finally {
                        this.draining.set(false);
                    }
                    if (this.buffer.peek() != null && this.wanted.get() > Long.MIN_VALUE) {
                        this.tryDrain();
                    }
                }
            }
        }

        class PassThruSubscriber
        implements Subscriber<T> {
            PassThruSubscriber() {
            }

            public void onSubscribe(org.reactivestreams.Subscription s) {
                Subscription.this.upstreamSubscription.set(s);
                s.request(Long.MAX_VALUE);
            }

            public void onNext(T t) {
                Subscription.this.onNext(t);
            }

            public void onError(Throwable t) {
                Subscription.this.onError(t);
            }

            public void onComplete() {
                Subscription.this.onComplete();
            }
        }
    }
}

