/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.common.stream.AbortingSubscriber;
import com.linecorp.armeria.common.stream.AbstractStreamMessage;
import com.linecorp.armeria.common.stream.AbstractStreamMessageAndWriter;
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.internal.shaded.guava.collect.ImmutableList;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventLoopStreamMessage<T>
extends AbstractStreamMessageAndWriter<T> {
    private static final ConcurrentHashMap<List<StackTraceElement>, Boolean> UNEXPECTED_EVENT_LOOP_STACK_TRACES = new ConcurrentHashMap();
    private static final AtomicIntegerFieldUpdater<EventLoopStreamMessage> abortedUpdater = AtomicIntegerFieldUpdater.newUpdater(EventLoopStreamMessage.class, "aborted");
    private static final AtomicIntegerFieldUpdater<EventLoopStreamMessage> subscribedUpdater = AtomicIntegerFieldUpdater.newUpdater(EventLoopStreamMessage.class, "subscribed");
    private static final Logger logger = LoggerFactory.getLogger(EventLoopStreamMessage.class);
    private final EventLoop eventLoop;
    private final Queue<Object> queue;
    @Nullable
    private AbstractStreamMessage.SubscriptionImpl subscription;
    private long demand;
    private boolean invokedOnSubscribe;
    private boolean inOnNext;
    private AbstractStreamMessageAndWriter.State state = AbstractStreamMessageAndWriter.State.OPEN;
    private volatile int subscribed;
    private volatile int aborted;
    private volatile boolean isOpen = true;
    private volatile boolean wroteAny;

    public EventLoopStreamMessage() {
        this(RequestContext.mapCurrent(RequestContext::eventLoop, () -> {
            UnexpectedEventLoopException e = new UnexpectedEventLoopException();
            ImmutableList<StackTraceElement> stackTrace = ImmutableList.copyOf(e.getStackTrace());
            UNEXPECTED_EVENT_LOOP_STACK_TRACES.computeIfAbsent(stackTrace, unused -> {
                logger.warn("Creating EventLoopStreamMessage without specifying EventLoop. This will be very slow if writer or subscriber run in a different EventLoop.", (Throwable)e);
                return true;
            });
            return CommonPools.workerGroup().next();
        }));
    }

    public EventLoopStreamMessage(EventLoop eventLoop) {
        this.eventLoop = Objects.requireNonNull(eventLoop, "eventLoop");
        this.queue = new ArrayDeque<Object>();
    }

    @Override
    public boolean isOpen() {
        return this.isOpen;
    }

    @Override
    public boolean isEmpty() {
        return !this.isOpen() && !this.wroteAny;
    }

    @Override
    protected EventExecutor defaultSubscriberExecutor() {
        return this.eventLoop;
    }

    @Override
    void subscribe(AbstractStreamMessage.SubscriptionImpl subscription) {
        Subscriber<Object> subscriber = subscription.subscriber();
        if (!subscribedUpdater.compareAndSet(this, 0, 1)) {
            this.eventLoop.execute(() -> EventLoopStreamMessage.failLateSubscriber(this.subscription, subscriber));
            return;
        }
        if (this.eventLoop.inEventLoop()) {
            this.doSubscribe(subscription);
        } else {
            this.eventLoop.execute(() -> this.doSubscribe(subscription));
        }
    }

    @Override
    public void close() {
        if (this.eventLoop.inEventLoop()) {
            this.doClose(null);
        } else {
            this.eventLoop.execute(() -> this.doClose(null));
        }
    }

    @Override
    public void close(Throwable cause) {
        Objects.requireNonNull(cause, "cause");
        if (cause instanceof CancelledSubscriptionException) {
            throw new IllegalArgumentException("cause: " + cause + " (must use Subscription.cancel())");
        }
        if (this.eventLoop.inEventLoop()) {
            this.doClose(cause);
        } else {
            this.eventLoop.execute(() -> this.doClose(cause));
        }
    }

    @Override
    public void abort() {
        if (abortedUpdater.compareAndSet(this, 0, 1)) {
            this.isOpen = false;
            if (subscribedUpdater.compareAndSet(this, 0, 1)) {
                if (this.eventLoop.inEventLoop()) {
                    this.doSetAbortedSubscription();
                    this.doCancelOrAbort(false);
                } else {
                    this.eventLoop.execute(() -> {
                        this.doSetAbortedSubscription();
                        this.doCancelOrAbort(false);
                    });
                }
                return;
            }
            this.cancelOrAbort(false);
        }
    }

    @Override
    long demand() {
        return this.demand;
    }

    @Override
    void request(long n) {
        if (this.eventLoop.inEventLoop()) {
            this.doRequest(n);
        } else {
            this.eventLoop.execute(() -> this.doRequest(n));
        }
    }

    @Override
    void cancel() {
        this.cancelOrAbort(true);
    }

    @Override
    void notifySubscriberOfCloseEvent(AbstractStreamMessage.SubscriptionImpl subscription, AbstractStreamMessage.CloseEvent event) {
        if (subscription.needsDirectInvocation()) {
            try {
                event.notifySubscriber(subscription, this.completionFuture());
            }
            finally {
                subscription.clearSubscriber();
                this.cleanup();
            }
        } else {
            subscription.executor().execute(() -> {
                try {
                    event.notifySubscriber(subscription, this.completionFuture());
                }
                finally {
                    subscription.clearSubscriber();
                    this.eventLoop.execute(this::cleanup);
                }
            });
        }
    }

    @Override
    void addObject(T obj) {
        this.wroteAny = true;
        if (this.eventLoop.inEventLoop()) {
            this.doAddObject(obj);
        } else {
            this.eventLoop.execute(() -> this.doAddObject(obj));
        }
    }

    @Override
    void addObjectOrEvent(Object obj) {
        if (this.eventLoop.inEventLoop()) {
            this.doAddObjectOrEvent(obj);
        } else {
            this.eventLoop.execute(() -> this.doAddObjectOrEvent(obj));
        }
    }

    private void doClose(@Nullable Throwable cause) {
        if (this.state != AbstractStreamMessageAndWriter.State.OPEN) {
            return;
        }
        this.doSetState(AbstractStreamMessageAndWriter.State.CLOSED);
        AbstractStreamMessage.CloseEvent event = cause == null ? SUCCESSFUL_CLOSE : new AbstractStreamMessage.CloseEvent(cause);
        this.doAddObjectOrEvent(event);
    }

    private void doSetState(AbstractStreamMessageAndWriter.State state) {
        this.state = state;
        this.isOpen = false;
    }

    private void doSubscribe(AbstractStreamMessage.SubscriptionImpl subscription) {
        this.subscription = subscription;
        if (subscription.needsDirectInvocation()) {
            this.invokedOnSubscribe = true;
            subscription.subscriber().onSubscribe((Subscription)subscription);
        } else {
            subscription.executor().execute(() -> {
                subscription.subscriber().onSubscribe((Subscription)subscription);
                this.eventLoop.execute(() -> {
                    this.invokedOnSubscribe = true;
                });
            });
        }
    }

    private void doRequest(long n) {
        long oldDemand = this.demand;
        this.demand = oldDemand >= Long.MAX_VALUE - n ? Long.MAX_VALUE : oldDemand + n;
        if (oldDemand == 0L) {
            this.doNotifySubscriberIfNotEmpty();
        }
    }

    private void doAddObject(T obj) {
        if (this.queue.isEmpty() && this.demand > 0L && !this.inOnNext) {
            AbstractStreamMessage.SubscriptionImpl subscription = this.subscription;
            --this.demand;
            this.doNotifySubscriberOfObject(subscription, obj);
            return;
        }
        this.doAddObjectOrEvent(obj);
    }

    private void doAddObjectOrEvent(Object obj) {
        this.queue.add(obj);
        if (this.subscription != null) {
            this.doNotifySubscriber(this.subscription);
        }
    }

    private void doNotifySubscriberIfNotEmpty() {
        AbstractStreamMessage.SubscriptionImpl subscription = this.subscription;
        if (subscription == null) {
            return;
        }
        if (this.queue.isEmpty()) {
            return;
        }
        this.doNotifySubscriber(subscription);
    }

    private void doNotifySubscriber(AbstractStreamMessage.SubscriptionImpl subscription) {
        if (this.inOnNext) {
            return;
        }
        if (!this.invokedOnSubscribe) {
            this.eventLoop.execute(() -> this.doNotifySubscriber(subscription));
            return;
        }
        while (true) {
            if (this.state == AbstractStreamMessageAndWriter.State.CLEANUP) {
                this.cleanup();
                return;
            }
            Object o = this.queue.peek();
            if (o == null) break;
            if (o instanceof AbstractStreamMessage.CloseEvent) {
                this.doHandleCloseEvent(subscription, (AbstractStreamMessage.CloseEvent)this.queue.remove());
                break;
            }
            if (this.demand == 0L) break;
            if (o instanceof AbstractStreamMessageAndWriter.AwaitDemandFuture) {
                AbstractStreamMessageAndWriter.AwaitDemandFuture f = (AbstractStreamMessageAndWriter.AwaitDemandFuture)this.queue.remove();
                f.complete(null);
                continue;
            }
            --this.demand;
            Object obj = this.queue.remove();
            this.doNotifySubscriberOfObject(subscription, obj);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doNotifySubscriberOfObject(AbstractStreamMessage.SubscriptionImpl subscription, T obj) {
        Subscriber<Object> subscriber = subscription.subscriber();
        obj = this.prepareObjectForNotification(subscription, obj);
        if (subscription.needsDirectInvocation()) {
            this.inOnNext = true;
            try {
                subscriber.onNext(obj);
            }
            finally {
                this.inOnNext = false;
            }
        } else {
            Object published = obj;
            subscription.executor().execute(() -> subscriber.onNext(published));
        }
    }

    private void doHandleCloseEvent(AbstractStreamMessage.SubscriptionImpl subscription, AbstractStreamMessage.CloseEvent event) {
        if (!this.invokedOnSubscribe) {
            this.eventLoop.execute(() -> this.doHandleCloseEvent(subscription, event));
            return;
        }
        this.doSetState(AbstractStreamMessageAndWriter.State.CLEANUP);
        this.notifySubscriberOfCloseEvent(subscription, event);
    }

    private void cancelOrAbort(boolean cancel) {
        if (this.eventLoop.inEventLoop()) {
            this.doCancelOrAbort(cancel);
        } else {
            this.eventLoop.execute(() -> this.doCancelOrAbort(cancel));
        }
    }

    private void doCancelOrAbort(boolean cancel) {
        if (this.state == AbstractStreamMessageAndWriter.State.OPEN) {
            this.doSetState(AbstractStreamMessageAndWriter.State.CLEANUP);
            AbstractStreamMessage.CloseEvent closeEvent = cancel ? (Flags.verboseExceptions() ? new AbstractStreamMessage.CloseEvent(CancelledSubscriptionException.get()) : CANCELLED_CLOSE) : (Flags.verboseExceptions() ? new AbstractStreamMessage.CloseEvent(AbortedStreamException.get()) : ABORTED_CLOSE);
            this.doAddObjectOrEvent(closeEvent);
            return;
        }
        switch (this.state) {
            case CLOSED: {
                this.doSetState(AbstractStreamMessageAndWriter.State.CLEANUP);
                this.cleanup();
                break;
            }
            case CLEANUP: {
                break;
            }
            default: {
                throw new Error();
            }
        }
    }

    private void doSetAbortedSubscription() {
        this.subscription = new AbstractStreamMessage.SubscriptionImpl(this, AbortingSubscriber.get(), (EventExecutor)ImmediateEventExecutor.INSTANCE, false);
        this.invokedOnSubscribe = true;
    }

    private void cleanup() {
        this.cleanupQueue(this.subscription, this.queue);
    }

    private static class UnexpectedEventLoopException
    extends RuntimeException {
        private static final long serialVersionUID = 5610415039321743416L;

        private UnexpectedEventLoopException() {
        }
    }
}

