/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Action;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.internal.ManagedSubscription;

public class FanOutPublisher<T>
implements TransformablePublisher<T> {
    private final Publisher<? extends Iterable<? extends T>> publisher;
    private final Action<? super T> disposer;

    public FanOutPublisher(Publisher<? extends Iterable<? extends T>> publisher, Action<? super T> disposer) {
        this.publisher = publisher;
        this.disposer = disposer;
    }

    public void subscribe(Subscriber<? super T> s) {
        s.onSubscribe((Subscription)new ManagedSubscription<T>(s, this.disposer){
            Iterator<? extends T> iterator;
            Subscription subscription;
            AtomicReference<State> state;
            {
                this.state = new AtomicReference<State>(State.UNSUBSCRIBED);
            }

            @Override
            protected void onRequest(long n) {
                if (this.state.compareAndSet(State.UNSUBSCRIBED, State.PENDING_SUBSCRIBE)) {
                    FanOutPublisher.this.publisher.subscribe(new Subscriber<Iterable<? extends T>>(){

                        public void onSubscribe(Subscription s) {
                            subscription = s;
                            state.set(State.REQUESTED);
                            s.request(1L);
                        }

                        public void onNext(Iterable<? extends T> ts) {
                            iterator = ts.iterator();
                            state.set(State.IDLE);
                            this.drain();
                        }

                        public void onError(Throwable t) {
                            this.emitError(t);
                            this.drain();
                        }

                        public void onComplete() {
                            subscription = null;
                            state.compareAndSet(State.REQUESTED, State.IDLE);
                            this.drain();
                        }
                    });
                } else if (this.iterator != null) {
                    this.drain();
                }
            }

            private void drain() {
                if (this.state.compareAndSet(State.IDLE, State.EMITTING)) {
                    if (this.isDone()) {
                        if (this.iterator != null) {
                            while (this.iterator.hasNext()) {
                                this.dispose(this.iterator.next());
                            }
                        }
                        return;
                    }
                    boolean hasNext = false;
                    if (this.iterator != null) {
                        while ((hasNext = this.iterator.hasNext()) && this.shouldEmit()) {
                            this.emitNext(this.iterator.next());
                        }
                    }
                    if (!hasNext) {
                        if (this.subscription == null) {
                            this.emitComplete();
                        } else if (this.hasDemand()) {
                            this.state.set(State.REQUESTED);
                            this.subscription.request(1L);
                            return;
                        }
                    }
                    this.state.set(State.IDLE);
                    if (this.hasDemand() || this.isDone()) {
                        this.drain();
                    }
                }
            }

            @Override
            protected void onCancel() {
                if (this.subscription != null) {
                    this.subscription.cancel();
                }
            }
        });
    }

    static enum State {
        UNSUBSCRIBED,
        PENDING_SUBSCRIBE,
        REQUESTED,
        IDLE,
        EMITTING;

    }
}

