/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.common.stream.AbortingSubscriber;
import com.linecorp.armeria.common.stream.AbstractStreamMessage;
import com.linecorp.armeria.common.stream.AbstractStreamMessageAndWriter;
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.internal.shaded.jctools.queues.MpscChunkedArrayQueue;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class DefaultStreamMessage<T>
extends AbstractStreamMessageAndWriter<T> {
    private static final AtomicReferenceFieldUpdater<DefaultStreamMessage, AbstractStreamMessage.SubscriptionImpl> subscriptionUpdater = AtomicReferenceFieldUpdater.newUpdater(DefaultStreamMessage.class, AbstractStreamMessage.SubscriptionImpl.class, "subscription");
    private static final AtomicReferenceFieldUpdater<DefaultStreamMessage, AbstractStreamMessageAndWriter.State> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(DefaultStreamMessage.class, AbstractStreamMessageAndWriter.State.class, "state");
    private final Queue<Object> queue;
    @Nullable
    private volatile AbstractStreamMessage.SubscriptionImpl subscription;
    private long demand;
    private volatile AbstractStreamMessageAndWriter.State state = AbstractStreamMessageAndWriter.State.OPEN;
    private volatile boolean wroteAny;
    private boolean inOnNext;
    private boolean invokedOnSubscribe;

    public DefaultStreamMessage() {
        this.queue = new MpscChunkedArrayQueue<Object>(32, 0x40000000);
    }

    @Override
    public boolean isOpen() {
        return this.state == AbstractStreamMessageAndWriter.State.OPEN;
    }

    @Override
    public boolean isEmpty() {
        return !this.isOpen() && !this.wroteAny;
    }

    @Override
    void subscribe(AbstractStreamMessage.SubscriptionImpl subscription) {
        Subscriber<Object> subscriber = subscription.subscriber();
        EventExecutor executor = subscription.executor();
        if (!subscriptionUpdater.compareAndSet(this, null, subscription)) {
            DefaultStreamMessage.failLateSubscriber(this.subscription, subscriber);
            return;
        }
        if (subscription.needsDirectInvocation()) {
            this.invokedOnSubscribe = true;
            subscriber.onSubscribe((Subscription)subscription);
        } else {
            executor.execute(() -> {
                this.invokedOnSubscribe = true;
                subscriber.onSubscribe((Subscription)subscription);
            });
        }
    }

    @Override
    public void abort() {
        AbstractStreamMessage.SubscriptionImpl currentSubscription = this.subscription;
        if (currentSubscription != null) {
            this.cancelOrAbort(false);
            return;
        }
        AbstractStreamMessage.SubscriptionImpl newSubscription = new AbstractStreamMessage.SubscriptionImpl(this, AbortingSubscriber.get(), (EventExecutor)ImmediateEventExecutor.INSTANCE, false);
        if (subscriptionUpdater.compareAndSet(this, null, newSubscription)) {
            this.invokedOnSubscribe = true;
        }
        this.cancelOrAbort(false);
    }

    @Override
    void addObject(T obj) {
        this.wroteAny = true;
        this.addObjectOrEvent(obj);
    }

    @Override
    long demand() {
        return this.demand;
    }

    @Override
    void request(long n) {
        AbstractStreamMessage.SubscriptionImpl subscription = this.subscription;
        assert (subscription != null);
        if (subscription.needsDirectInvocation()) {
            this.doRequest(n);
        } else {
            subscription.executor().execute(() -> this.doRequest(n));
        }
    }

    private void doRequest(long n) {
        long oldDemand = this.demand;
        this.demand = oldDemand >= Long.MAX_VALUE - n ? Long.MAX_VALUE : oldDemand + n;
        if (oldDemand == 0L && !this.queue.isEmpty()) {
            this.notifySubscriber0();
        }
    }

    @Override
    void cancel() {
        this.cancelOrAbort(true);
    }

    @Override
    void notifySubscriberOfCloseEvent(AbstractStreamMessage.SubscriptionImpl subscription, AbstractStreamMessage.CloseEvent event) {
        try {
            event.notifySubscriber(subscription, this.completionFuture());
        }
        finally {
            subscription.clearSubscriber();
            this.cleanup();
        }
    }

    private void cancelOrAbort(boolean cancel) {
        if (this.setState(AbstractStreamMessageAndWriter.State.OPEN, AbstractStreamMessageAndWriter.State.CLEANUP)) {
            AbstractStreamMessage.CloseEvent closeEvent = cancel ? (Flags.verboseExceptions() ? new AbstractStreamMessage.CloseEvent(CancelledSubscriptionException.get()) : CANCELLED_CLOSE) : (Flags.verboseExceptions() ? new AbstractStreamMessage.CloseEvent(AbortedStreamException.get()) : ABORTED_CLOSE);
            this.addObjectOrEvent(closeEvent);
            return;
        }
        switch (this.state) {
            case CLOSED: {
                if (!this.setState(AbstractStreamMessageAndWriter.State.CLOSED, AbstractStreamMessageAndWriter.State.CLEANUP)) break;
                this.subscription.executor().execute(this::cleanup);
                break;
            }
            case CLEANUP: {
                break;
            }
            default: {
                throw new Error();
            }
        }
    }

    @Override
    void addObjectOrEvent(Object obj) {
        this.queue.add(obj);
        this.notifySubscriber();
    }

    final void notifySubscriber() {
        AbstractStreamMessage.SubscriptionImpl subscription = this.subscription;
        if (subscription == null) {
            return;
        }
        if (this.queue.isEmpty()) {
            return;
        }
        if (subscription.needsDirectInvocation()) {
            this.notifySubscriber0();
        } else {
            subscription.executor().execute(this::notifySubscriber0);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void notifySubscriber0() {
        if (this.inOnNext) {
            return;
        }
        AbstractStreamMessage.SubscriptionImpl subscription = this.subscription;
        if (!this.invokedOnSubscribe) {
            EventExecutor executor = subscription.executor();
            executor.execute(this::notifySubscriber0);
            return;
        }
        while (true) {
            if (this.state == AbstractStreamMessageAndWriter.State.CLEANUP) {
                this.cleanup();
                return;
            }
            Object o = this.queue.peek();
            if (o == null) return;
            if (o instanceof AbstractStreamMessage.CloseEvent) {
                this.handleCloseEvent(subscription, (AbstractStreamMessage.CloseEvent)this.queue.remove());
                return;
            }
            if (o instanceof AbstractStreamMessageAndWriter.AwaitDemandFuture) {
                if (!this.notifyAwaitDemandFuture()) return;
                continue;
            }
            if (!this.notifySubscriberWithElements(subscription)) return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean notifySubscriberWithElements(AbstractStreamMessage.SubscriptionImpl subscription) {
        Subscriber<Object> subscriber = subscription.subscriber();
        if (this.demand == 0L) {
            return false;
        }
        if (this.demand != Long.MAX_VALUE) {
            --this.demand;
        }
        Object o = this.queue.remove();
        this.inOnNext = true;
        try {
            o = this.prepareObjectForNotification(subscription, o);
            subscriber.onNext(o);
        }
        finally {
            this.inOnNext = false;
        }
        return true;
    }

    private boolean notifyAwaitDemandFuture() {
        if (this.demand == 0L) {
            return false;
        }
        CompletableFuture f = (CompletableFuture)this.queue.remove();
        f.complete(null);
        return true;
    }

    private void handleCloseEvent(AbstractStreamMessage.SubscriptionImpl subscription, AbstractStreamMessage.CloseEvent o) {
        this.setState(AbstractStreamMessageAndWriter.State.OPEN, AbstractStreamMessageAndWriter.State.CLEANUP);
        this.notifySubscriberOfCloseEvent(subscription, o);
    }

    @Override
    public void close() {
        if (this.setState(AbstractStreamMessageAndWriter.State.OPEN, AbstractStreamMessageAndWriter.State.CLOSED)) {
            this.addObjectOrEvent(SUCCESSFUL_CLOSE);
        }
    }

    @Override
    public void close(Throwable cause) {
        Objects.requireNonNull(cause, "cause");
        if (cause instanceof CancelledSubscriptionException) {
            throw new IllegalArgumentException("cause: " + cause + " (must use Subscription.cancel())");
        }
        this.tryClose(cause);
    }

    protected final boolean tryClose(Throwable cause) {
        if (this.setState(AbstractStreamMessageAndWriter.State.OPEN, AbstractStreamMessageAndWriter.State.CLOSED)) {
            this.addObjectOrEvent(new AbstractStreamMessage.CloseEvent(cause));
            return true;
        }
        return false;
    }

    private boolean setState(AbstractStreamMessageAndWriter.State oldState, AbstractStreamMessageAndWriter.State newState) {
        assert (newState != AbstractStreamMessageAndWriter.State.OPEN) : "oldState: " + (Object)((Object)oldState) + ", newState: " + (Object)((Object)newState);
        return stateUpdater.compareAndSet(this, oldState, newState);
    }

    private void cleanup() {
        this.cleanupQueue(this.subscription, this.queue);
    }
}

