/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Action;
import ratpack.stream.StreamEvent;
import ratpack.stream.TransformablePublisher;

public class WiretapPublisher<T>
implements TransformablePublisher<T> {
    private final Publisher<? extends T> publisher;
    private final Action<? super StreamEvent<? super T>> listener;
    private final AtomicInteger counter = new AtomicInteger();

    public WiretapPublisher(Publisher<? extends T> publisher, Action<? super StreamEvent<? super T>> listener) {
        this.publisher = publisher;
        this.listener = listener;
    }

    public void subscribe(final Subscriber<? super T> outSubscriber) {
        final int subscriptionId = this.counter.getAndIncrement();
        this.publisher.subscribe(new Subscriber<T>(){
            private Subscription subscription;
            private final AtomicBoolean done = new AtomicBoolean();

            public void onSubscribe(final Subscription subscription) {
                this.subscription = subscription;
                outSubscriber.onSubscribe(new Subscription(){

                    public void request(long n) {
                        try {
                            WiretapPublisher.this.listener.execute(new RequestEvent(subscriptionId, n));
                        }
                        catch (Throwable throwable) {
                            subscription.cancel();
                            this.onError(throwable);
                            return;
                        }
                        subscription.request(n);
                    }

                    public void cancel() {
                        try {
                            WiretapPublisher.this.listener.execute(new CancelEvent(subscriptionId));
                        }
                        catch (Throwable throwable) {
                            try {
                                subscription.cancel();
                            }
                            catch (Throwable e) {
                                throwable.addSuppressed(e);
                            }
                            this.onError(throwable);
                            return;
                        }
                        subscription.cancel();
                    }
                });
            }

            public void onNext(T in) {
                try {
                    WiretapPublisher.this.listener.execute(new DataEvent(subscriptionId, in));
                }
                catch (Throwable throwable) {
                    this.subscription.cancel();
                    this.onError(throwable);
                    return;
                }
                if (!this.done.get()) {
                    outSubscriber.onNext(in);
                }
            }

            public void onError(Throwable t) {
                if (this.done.compareAndSet(false, true)) {
                    try {
                        WiretapPublisher.this.listener.execute(new ErrorEvent(subscriptionId, t));
                    }
                    catch (Throwable throwable) {
                        t.addSuppressed(throwable);
                        this.onError(t);
                        return;
                    }
                    outSubscriber.onError(t);
                }
            }

            public void onComplete() {
                if (this.done.compareAndSet(false, true)) {
                    try {
                        WiretapPublisher.this.listener.execute(new CompletionEvent(subscriptionId));
                    }
                    catch (Throwable throwable) {
                        outSubscriber.onError(throwable);
                        return;
                    }
                    outSubscriber.onComplete();
                }
            }
        });
    }

    private static class RequestEvent<T>
    implements StreamEvent<T> {
        private final long requestAmount;
        private final int subscriptionId;

        private RequestEvent(int subscriptionId, long requestAmount) {
            this.requestAmount = requestAmount;
            this.subscriptionId = subscriptionId;
        }

        @Override
        public int getSubscriptionId() {
            return this.subscriptionId;
        }

        @Override
        public boolean isComplete() {
            return false;
        }

        @Override
        public boolean isError() {
            return false;
        }

        @Override
        public boolean isData() {
            return false;
        }

        @Override
        public boolean isCancel() {
            return false;
        }

        @Override
        public boolean isRequest() {
            return true;
        }

        @Override
        public long getRequestAmount() {
            return this.requestAmount;
        }

        @Override
        public Throwable getThrowable() {
            return null;
        }

        @Override
        public T getItem() {
            return null;
        }

        public String toString() {
            return "StreamEvent[RequestEvent{requestAmount=" + this.requestAmount + ", subscriptionId=" + this.subscriptionId + "}]";
        }
    }

    private static class CancelEvent<T>
    implements StreamEvent<T> {
        private final int subscriptionId;

        private CancelEvent(int subscriptionId) {
            this.subscriptionId = subscriptionId;
        }

        @Override
        public int getSubscriptionId() {
            return this.subscriptionId;
        }

        @Override
        public boolean isComplete() {
            return false;
        }

        @Override
        public boolean isError() {
            return false;
        }

        @Override
        public boolean isData() {
            return false;
        }

        @Override
        public boolean isCancel() {
            return true;
        }

        @Override
        public boolean isRequest() {
            return false;
        }

        @Override
        public long getRequestAmount() {
            return 0L;
        }

        @Override
        public Throwable getThrowable() {
            return null;
        }

        @Override
        public T getItem() {
            return null;
        }

        public String toString() {
            return "StreamEvent[CancelEvent{subscriptionId=" + this.subscriptionId + "}]";
        }
    }

    private static class ErrorEvent<T>
    implements StreamEvent<T> {
        private final int subscriptionId;
        private final Throwable error;

        private ErrorEvent(int subscriptionId, Throwable error) {
            this.subscriptionId = subscriptionId;
            this.error = error;
        }

        @Override
        public int getSubscriptionId() {
            return this.subscriptionId;
        }

        @Override
        public boolean isComplete() {
            return false;
        }

        @Override
        public boolean isError() {
            return true;
        }

        @Override
        public boolean isData() {
            return false;
        }

        @Override
        public Throwable getThrowable() {
            return this.error;
        }

        @Override
        public T getItem() {
            return null;
        }

        @Override
        public boolean isCancel() {
            return false;
        }

        @Override
        public boolean isRequest() {
            return false;
        }

        @Override
        public long getRequestAmount() {
            return 0L;
        }

        public String toString() {
            return "StreamEvent[ErrorEvent{subscriptionId=" + this.subscriptionId + ", error=" + this.error + "}]";
        }
    }

    private static class CompletionEvent<T>
    implements StreamEvent<T> {
        private final int subscriptionId;

        private CompletionEvent(int subscriptionId) {
            this.subscriptionId = subscriptionId;
        }

        @Override
        public int getSubscriptionId() {
            return this.subscriptionId;
        }

        @Override
        public boolean isComplete() {
            return true;
        }

        @Override
        public boolean isError() {
            return false;
        }

        @Override
        public boolean isData() {
            return false;
        }

        @Override
        public Throwable getThrowable() {
            return null;
        }

        @Override
        public T getItem() {
            return null;
        }

        @Override
        public boolean isCancel() {
            return false;
        }

        @Override
        public boolean isRequest() {
            return false;
        }

        @Override
        public long getRequestAmount() {
            return 0L;
        }

        public String toString() {
            return "StreamEvent[CompletionEvent{subscriptionId=" + this.subscriptionId + "}]";
        }
    }

    private static class DataEvent<T>
    implements StreamEvent<T> {
        private final int subscriptionId;
        private final T data;

        private DataEvent(int subscriptionId, T data) {
            this.subscriptionId = subscriptionId;
            this.data = data;
        }

        @Override
        public int getSubscriptionId() {
            return this.subscriptionId;
        }

        @Override
        public boolean isComplete() {
            return false;
        }

        @Override
        public boolean isError() {
            return false;
        }

        @Override
        public boolean isData() {
            return true;
        }

        @Override
        public Throwable getThrowable() {
            return null;
        }

        @Override
        public T getItem() {
            return this.data;
        }

        @Override
        public boolean isCancel() {
            return false;
        }

        @Override
        public boolean isRequest() {
            return false;
        }

        @Override
        public long getRequestAmount() {
            return 0L;
        }

        public String toString() {
            return "StreamEvent[DataEvent{subscriptionId=" + this.subscriptionId + ", data=" + this.data + "}]";
        }
    }
}

