/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.common.stream.AbortingSubscriber;
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.common.stream.NoopSubscriber;
import com.linecorp.armeria.common.stream.NoopSubscription;
import com.linecorp.armeria.common.stream.StreamMessage;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class PublisherBasedStreamMessage<T>
implements StreamMessage<T> {
    private static final AtomicReferenceFieldUpdater<PublisherBasedStreamMessage, AbortableSubscriber> subscriberUpdater = AtomicReferenceFieldUpdater.newUpdater(PublisherBasedStreamMessage.class, AbortableSubscriber.class, "subscriber");
    private final Publisher<? extends T> publisher;
    private final CompletableFuture<Void> completionFuture = new CompletableFuture();
    @Nullable
    private volatile AbortableSubscriber subscriber;
    private volatile boolean publishedAny;

    public PublisherBasedStreamMessage(Publisher<? extends T> publisher) {
        this.publisher = publisher;
    }

    protected final Publisher<? extends T> delegate() {
        return this.publisher;
    }

    @Override
    public boolean isOpen() {
        return !this.completionFuture.isDone();
    }

    @Override
    public boolean isEmpty() {
        return !this.isOpen() && !this.publishedAny;
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber) {
        this.subscribe(subscriber, this.defaultSubscriberExecutor(), false);
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber, boolean withPooledObjects) {
        this.subscribe(subscriber, this.defaultSubscriberExecutor(), withPooledObjects);
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber, EventExecutor executor) {
        this.subscribe(subscriber, executor, false);
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber, EventExecutor executor, boolean withPooledObjects) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(executor, "executor");
        this.subscribe0(subscriber, executor);
    }

    protected EventExecutor defaultSubscriberExecutor() {
        return (EventExecutor)RequestContext.mapCurrent(RequestContext::eventLoop, () -> CommonPools.workerGroup().next());
    }

    private void subscribe0(Subscriber<? super T> subscriber, EventExecutor executor) {
        AbortableSubscriber s = new AbortableSubscriber(this, subscriber, executor);
        if (!subscriberUpdater.compareAndSet(this, null, s)) {
            PublisherBasedStreamMessage.failLateSubscriber(executor, subscriber, this.subscriber.subscriber);
        }
        this.publisher.subscribe((Subscriber)s);
    }

    private static void failLateSubscriber(EventExecutor executor, Subscriber<?> lateSubscriber, Subscriber<?> oldSubscriber) {
        RuntimeException cause = oldSubscriber instanceof AbortingSubscriber ? AbortedStreamException.get() : new IllegalStateException("subscribed by other subscriber already");
        executor.execute(() -> {
            lateSubscriber.onSubscribe((Subscription)NoopSubscription.INSTANCE);
            lateSubscriber.onError(cause);
        });
    }

    @Override
    public void abort() {
        AbortableSubscriber subscriber = this.subscriber;
        if (subscriber != null) {
            subscriber.abort();
            return;
        }
        AbortableSubscriber abortable = new AbortableSubscriber(this, AbortingSubscriber.get(), (EventExecutor)ImmediateEventExecutor.INSTANCE);
        if (!subscriberUpdater.compareAndSet(this, null, abortable)) {
            this.subscriber.abort();
            return;
        }
        abortable.abort();
        abortable.onSubscribe(NoopSubscription.INSTANCE);
    }

    @Override
    public CompletableFuture<Void> completionFuture() {
        return this.completionFuture;
    }

    static final class AbortableSubscriber
    implements Subscriber<Object>,
    Subscription {
        private final PublisherBasedStreamMessage<?> parent;
        private final EventExecutor executor;
        private Subscriber<Object> subscriber;
        private volatile boolean abortPending;
        @Nullable
        private volatile Subscription subscription;

        AbortableSubscriber(PublisherBasedStreamMessage<?> parent, Subscriber<?> subscriber, EventExecutor executor) {
            this.parent = parent;
            this.subscriber = subscriber;
            this.executor = executor;
        }

        public void request(long n) {
            Subscription subscription = this.subscription;
            assert (subscription != null);
            subscription.request(n);
        }

        public void cancel() {
            assert (this.subscription != null);
            this.cancelOrAbort(!this.abortPending);
        }

        void abort() {
            this.abortPending = true;
            if (this.subscription != null) {
                this.cancelOrAbort(false);
            }
        }

        private void cancelOrAbort(boolean cancel) {
            if (this.executor.inEventLoop()) {
                this.cancelOrAbort0(cancel);
            } else {
                this.executor.execute(() -> this.cancelOrAbort0(cancel));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cancelOrAbort0(boolean cancel) {
            RuntimeException runtimeException;
            CompletableFuture<Void> completionFuture;
            block8: {
                completionFuture = this.parent.completionFuture();
                if (completionFuture.isDone()) {
                    return;
                }
                Subscriber<Object> subscriber = this.subscriber;
                if (!(subscriber instanceof AbortingSubscriber)) {
                    this.subscriber = NoopSubscriber.get();
                }
                try {
                    if (cancel) break block8;
                    subscriber.onError((Throwable)AbortedStreamException.get());
                }
                catch (Throwable throwable) {
                    RuntimeException runtimeException2;
                    try {
                        this.subscription.cancel();
                        runtimeException2 = cancel ? CancelledSubscriptionException.get() : AbortedStreamException.get();
                    }
                    catch (Throwable throwable2) {
                        completionFuture.completeExceptionally(cancel ? CancelledSubscriptionException.get() : AbortedStreamException.get());
                        throw throwable2;
                    }
                    completionFuture.completeExceptionally(runtimeException2);
                    throw throwable;
                }
            }
            try {
                this.subscription.cancel();
                runtimeException = cancel ? CancelledSubscriptionException.get() : AbortedStreamException.get();
            }
            catch (Throwable throwable) {
                completionFuture.completeExceptionally(cancel ? CancelledSubscriptionException.get() : AbortedStreamException.get());
                throw throwable;
            }
            completionFuture.completeExceptionally(runtimeException);
        }

        public void onSubscribe(Subscription subscription) {
            if (this.executor.inEventLoop()) {
                this.onSubscribe0(subscription);
            } else {
                this.executor.execute(() -> this.onSubscribe0(subscription));
            }
        }

        private void onSubscribe0(Subscription subscription) {
            try {
                this.subscription = subscription;
                this.subscriber.onSubscribe((Subscription)this);
            }
            finally {
                if (this.abortPending) {
                    this.cancelOrAbort0(false);
                }
            }
        }

        public void onNext(Object obj) {
            ((PublisherBasedStreamMessage)this.parent).publishedAny = true;
            if (this.executor.inEventLoop()) {
                this.subscriber.onNext(obj);
            } else {
                this.executor.execute(() -> this.subscriber.onNext(obj));
            }
        }

        public void onError(Throwable cause) {
            if (this.executor.inEventLoop()) {
                this.onError0(cause);
            } else {
                this.executor.execute(() -> this.onError0(cause));
            }
        }

        private void onError0(Throwable cause) {
            try {
                this.subscriber.onError(cause);
            }
            finally {
                this.parent.completionFuture().completeExceptionally(cause);
            }
        }

        public void onComplete() {
            if (this.executor.inEventLoop()) {
                this.onComplete0();
            } else {
                this.executor.execute(this::onComplete0);
            }
        }

        private void onComplete0() {
            try {
                this.subscriber.onComplete();
            }
            finally {
                this.parent.completionFuture().complete(null);
            }
        }
    }
}

