/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.common.stream.AbortingSubscriber;
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.common.stream.ClosedPublisherException;
import com.linecorp.armeria.common.stream.NeverInvokedSubscriber;
import com.linecorp.armeria.common.stream.NoopSubscription;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.PooledObjects;
import com.linecorp.armeria.internal.shaded.guava.base.MoreObjects;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

abstract class AbstractStreamMessage<T>
implements StreamMessage<T> {
    static final CloseEvent SUCCESSFUL_CLOSE = new CloseEvent(null);
    static final CloseEvent CANCELLED_CLOSE = new CloseEvent(Exceptions.clearTrace(CancelledSubscriptionException.get()));
    static final CloseEvent ABORTED_CLOSE = new CloseEvent(Exceptions.clearTrace(AbortedStreamException.get()));
    private final CompletableFuture<Void> completionFuture = new CompletableFuture();

    AbstractStreamMessage() {
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber) {
        this.subscribe(subscriber, this.defaultSubscriberExecutor(), false);
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber, boolean withPooledObjects) {
        this.subscribe(subscriber, this.defaultSubscriberExecutor(), withPooledObjects);
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber, EventExecutor executor) {
        this.subscribe(subscriber, executor, false);
    }

    @Override
    public final void subscribe(Subscriber<? super T> subscriber, EventExecutor executor, boolean withPooledObjects) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(executor, "executor");
        this.subscribe(new SubscriptionImpl(this, subscriber, executor, withPooledObjects));
    }

    abstract void subscribe(SubscriptionImpl var1);

    protected EventExecutor defaultSubscriberExecutor() {
        return (EventExecutor)RequestContext.mapCurrent(RequestContext::eventLoop, () -> CommonPools.workerGroup().next());
    }

    @Override
    public final CompletableFuture<Void> completionFuture() {
        return this.completionFuture;
    }

    abstract long demand();

    abstract void request(long var1);

    abstract void cancel();

    abstract void notifySubscriberOfCloseEvent(SubscriptionImpl var1, CloseEvent var2);

    protected void onRemoval(T obj) {
    }

    static void failLateSubscriber(SubscriptionImpl subscription, Subscriber<?> lateSubscriber) {
        Subscriber<Object> oldSubscriber = subscription.subscriber();
        RuntimeException cause = oldSubscriber instanceof AbortingSubscriber ? AbortedStreamException.get() : new IllegalStateException("subscribed by other subscriber already");
        if (subscription.needsDirectInvocation()) {
            lateSubscriber.onSubscribe((Subscription)NoopSubscription.INSTANCE);
            lateSubscriber.onError((Throwable)cause);
        } else {
            subscription.executor().execute(() -> {
                lateSubscriber.onSubscribe((Subscription)NoopSubscription.INSTANCE);
                lateSubscriber.onError(cause);
            });
        }
    }

    T prepareObjectForNotification(SubscriptionImpl subscription, T o) {
        ReferenceCountUtil.touch(o);
        this.onRemoval(o);
        if (!subscription.withPooledObjects()) {
            o = PooledObjects.toUnpooled(o);
        }
        return o;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cleanupQueue(SubscriptionImpl subscription, Queue<Object> queue) {
        Object e;
        ClosedPublisherException cause = ClosedPublisherException.get();
        while ((e = queue.poll()) != null) {
            try {
                if (e instanceof CloseEvent) {
                    this.notifySubscriberOfCloseEvent(subscription, (CloseEvent)e);
                    continue;
                }
                if (e instanceof CompletableFuture) {
                    ((CompletableFuture)e).completeExceptionally(cause);
                }
                Object obj = e;
                this.onRemoval(obj);
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)e);
            }
        }
    }

    static final class CloseEvent {
        @Nullable
        private final Throwable cause;

        CloseEvent(@Nullable Throwable cause) {
            this.cause = cause;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void notifySubscriber(SubscriptionImpl subscription, CompletableFuture<?> completionFuture) {
            if (completionFuture.isDone()) {
                return;
            }
            Subscriber<Object> subscriber = subscription.subscriber();
            Throwable cause = this.cause;
            if (cause == null && subscription.cancelRequested()) {
                cause = CancelledSubscriptionException.get();
            }
            if (cause == null) {
                try {
                    subscriber.onComplete();
                }
                finally {
                    completionFuture.complete(null);
                }
            }
            try {
                if (!(cause instanceof CancelledSubscriptionException)) {
                    subscriber.onError(cause);
                }
            }
            finally {
                completionFuture.completeExceptionally(cause);
            }
        }

        public String toString() {
            if (this.cause == null) {
                return "CloseEvent";
            }
            return "CloseEvent(" + this.cause + ')';
        }
    }

    static final class SubscriptionImpl
    implements Subscription {
        private final AbstractStreamMessage<?> publisher;
        private Subscriber<Object> subscriber;
        private final EventExecutor executor;
        private final boolean withPooledObjects;
        private volatile boolean cancelRequested;

        SubscriptionImpl(AbstractStreamMessage<?> publisher, Subscriber<?> subscriber, EventExecutor executor, boolean withPooledObjects) {
            this.publisher = publisher;
            this.subscriber = subscriber;
            this.executor = executor;
            this.withPooledObjects = withPooledObjects;
        }

        Subscriber<Object> subscriber() {
            return this.subscriber;
        }

        void clearSubscriber() {
            if (!(this.subscriber instanceof AbortingSubscriber)) {
                this.subscriber = NeverInvokedSubscriber.get();
            }
        }

        EventExecutor executor() {
            return this.executor;
        }

        boolean withPooledObjects() {
            return this.withPooledObjects;
        }

        boolean cancelRequested() {
            return this.cancelRequested;
        }

        public void request(long n) {
            if (n <= 0L) {
                this.invokeOnError(new IllegalArgumentException("n: " + n + " (expected: > 0, see Reactive Streams specification rule 3.9)"));
                return;
            }
            this.publisher.request(n);
        }

        public void cancel() {
            this.cancelRequested = true;
            this.publisher.cancel();
        }

        private void invokeOnError(Throwable cause) {
            if (this.needsDirectInvocation()) {
                this.subscriber.onError(cause);
            } else {
                this.executor.execute(() -> this.subscriber.onError(cause));
            }
        }

        boolean needsDirectInvocation() {
            return this.executor.inEventLoop();
        }

        public String toString() {
            return MoreObjects.toStringHelper(Subscription.class).add("publisher", this.publisher).add("demand", this.publisher.demand()).add("executor", this.executor).toString();
        }
    }
}

