/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.stream.TransformablePublisher;

public class FanOutPublisher<T>
implements TransformablePublisher<T> {
    private final Publisher<? extends Iterable<? extends T>> upstream;

    public FanOutPublisher(Publisher<? extends Iterable<? extends T>> upstream) {
        this.upstream = upstream;
    }

    public void subscribe(final Subscriber<? super T> downstream) {
        this.upstream.subscribe(new Subscriber<Iterable<? extends T>>(){
            private final AtomicBoolean done = new AtomicBoolean();
            private Subscription upstreamSubscription;

            public void onSubscribe(Subscription subscription) {
                this.upstreamSubscription = subscription;
                downstream.onSubscribe(new Subscription(){

                    public void request(long n) {
                        upstreamSubscription.request(n);
                    }

                    public void cancel() {
                        done.set(true);
                        upstreamSubscription.cancel();
                    }
                });
            }

            public void onNext(Iterable<? extends T> iterable) {
                Iterator iterator = iterable.iterator();
                if (!iterator.hasNext() && !this.done.get()) {
                    this.upstreamSubscription.request(1L);
                } else {
                    while (iterator.hasNext() && !this.done.get()) {
                        downstream.onNext(iterator.next());
                    }
                }
            }

            public void onError(Throwable t) {
                downstream.onError(t);
            }

            public void onComplete() {
                downstream.onComplete();
            }
        });
    }
}

