/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ratpack.func.Action;
import ratpack.func.Function;
import ratpack.stream.BufferedWriteStream;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.internal.SubscriptionSupport;

public class PartialBufferingPublisher<T>
implements TransformablePublisher<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PartialBufferingPublisher.class);
    private final Action<? super T> disposer;
    private final Function<? super BufferedWriteStream<T>, Subscription> function;

    public PartialBufferingPublisher(Action<? super T> disposer, Function<? super BufferedWriteStream<T>, Subscription> function) {
        this.disposer = disposer;
        this.function = function;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        new Buffer(subscriber);
    }

    private class Buffer
    extends SubscriptionSupport<T> {
        private final AtomicBoolean upstreamFinished;
        private final Subscription upstreamSubscription;
        private final AtomicLong wanted;
        private final AtomicBoolean open;
        private final ConcurrentLinkedQueue<T> buffer;
        private final AtomicBoolean draining;
        private final AtomicBoolean disposing;

        public Buffer(Subscriber<? super T> subscriber) {
            super(subscriber);
            this.upstreamFinished = new AtomicBoolean();
            this.wanted = new AtomicLong(Long.MIN_VALUE);
            this.open = new AtomicBoolean();
            this.buffer = new ConcurrentLinkedQueue();
            this.draining = new AtomicBoolean();
            this.disposing = new AtomicBoolean();
            Subscription upstreamSubscriptionTmp = null;
            try {
                upstreamSubscriptionTmp = (Subscription)PartialBufferingPublisher.this.function.apply(new BufferedWriteStream<T>(){

                    @Override
                    public void item(T item) {
                        Buffer.this.buffer.add(item);
                        Buffer.this.tryDrain();
                    }

                    @Override
                    public void error(Throwable throwable) {
                        Buffer.this.disposing.set(true);
                        Buffer.this.open.set(true);
                        Buffer.this.tryDrain();
                        Buffer.this.onError(throwable);
                    }

                    @Override
                    public void complete() {
                        Buffer.this.upstreamFinished.set(true);
                        Buffer.this.tryDrain();
                    }

                    @Override
                    public long getRequested() {
                        return Buffer.this.wanted.get() + Long.MAX_VALUE;
                    }

                    @Override
                    public long getBuffered() {
                        return Buffer.this.buffer.size();
                    }
                });
                this.start();
            }
            catch (Exception e) {
                subscriber.onError((Throwable)e);
            }
            this.upstreamSubscription = upstreamSubscriptionTmp;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void tryDrain() {
            if (this.draining.compareAndSet(false, true)) {
                try {
                    long i = this.wanted.get();
                    while (!this.isStopped() && (this.open.get() || i > Long.MIN_VALUE)) {
                        Object item = this.buffer.poll();
                        if (item == null) {
                            if (this.upstreamFinished.get()) {
                                this.onComplete();
                                return;
                            }
                            break;
                        }
                        if (this.disposing.get()) {
                            try {
                                PartialBufferingPublisher.this.disposer.execute(item);
                            }
                            catch (Exception e) {
                                LOGGER.warn("exception raised disposing of " + item + " - will be ignored", (Throwable)e);
                            }
                        } else {
                            this.onNext(item);
                        }
                        i = i >= 0L ? 0L : this.wanted.decrementAndGet();
                    }
                }
                finally {
                    this.draining.set(false);
                }
                if (this.buffer.peek() != null && this.wanted.get() > Long.MIN_VALUE) {
                    this.tryDrain();
                }
            }
        }

        @Override
        protected void doRequest(long n) {
            if (this.wanted.get() < 0L) {
                long nowWanted = this.wanted.addAndGet(n);
                if (nowWanted >= 0L) {
                    this.upstreamSubscription.request(Long.MAX_VALUE);
                    this.open.set(true);
                } else {
                    long outstanding = nowWanted + Long.MAX_VALUE + 1L - (long)this.buffer.size();
                    if (outstanding > 0L) {
                        this.upstreamSubscription.request(n);
                    }
                }
            }
            this.tryDrain();
        }

        @Override
        protected void doCancel() {
            this.disposing.set(true);
            this.open.set(true);
            this.upstreamSubscription.cancel();
            this.tryDrain();
        }
    }
}

