/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Function;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.WriteStream;

public class StreamMapPublisher<T, O>
implements TransformablePublisher<O> {
    private final Publisher<? extends T> upstream;
    private final Function<? super WriteStream<O>, ? extends WriteStream<? super T>> mapper;
    private WriteStream<? super T> input;

    public StreamMapPublisher(Publisher<? extends T> upstream, Function<? super WriteStream<O>, ? extends WriteStream<? super T>> mapper) {
        this.upstream = upstream;
        this.mapper = mapper;
    }

    public void subscribe(final Subscriber<? super O> downstreamSubscriber) {
        this.upstream.subscribe(new Subscriber<T>(){

            public void onSubscribe(final Subscription upstreamSubscription) {
                try {
                    StreamMapPublisher.this.input = (WriteStream)StreamMapPublisher.this.mapper.apply(new WriteStream<O>(){

                        @Override
                        public void item(O item) {
                            downstreamSubscriber.onNext(item);
                        }

                        @Override
                        public void error(Throwable throwable) {
                            upstreamSubscription.cancel();
                            downstreamSubscriber.onError(throwable);
                        }

                        @Override
                        public void complete() {
                            upstreamSubscription.cancel();
                            downstreamSubscriber.onComplete();
                        }
                    });
                }
                catch (Exception e) {
                    upstreamSubscription.cancel();
                    downstreamSubscriber.onError((Throwable)e);
                    return;
                }
                downstreamSubscriber.onSubscribe(new Subscription(){

                    public void request(long n) {
                        upstreamSubscription.request(n);
                    }

                    public void cancel() {
                        upstreamSubscription.cancel();
                    }
                });
            }

            public void onNext(T i) {
                StreamMapPublisher.this.input.item(i);
            }

            public void onError(Throwable t) {
                StreamMapPublisher.this.input.error(t);
            }

            public void onComplete() {
                StreamMapPublisher.this.input.complete();
            }
        });
    }
}

