/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.transport.netty.internal;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.EmptySubscriptions;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection;
import io.servicetalk.transport.netty.internal.WriteDemandEstimator;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class WriteStreamSubscriber
implements PublisherSource.Subscriber<Object>,
DefaultNettyConnection.ChannelOutboundListener,
Cancellable {
    private static final Logger LOGGER = LoggerFactory.getLogger(WriteStreamSubscriber.class);
    private static final GenericFutureListener WRITE_BOUNDARY = future -> {};
    private static final byte SOURCE_TERMINATED = 1;
    private static final byte CHANNEL_CLOSED = 2;
    private static final byte CLOSE_OUTBOUND_ON_SUBSCRIBER_TERMINATION = 4;
    private static final byte SUBSCRIBER_TERMINATED = 8;
    private static final PublisherSource.Subscription CANCELLED = EmptySubscriptions.newEmptySubscription();
    private static final AtomicLongFieldUpdater<WriteStreamSubscriber> requestedUpdater = AtomicLongFieldUpdater.newUpdater(WriteStreamSubscriber.class, "requested");
    private static final AtomicReferenceFieldUpdater<WriteStreamSubscriber, PublisherSource.Subscription> subscriptionUpdater = AtomicReferenceFieldUpdater.newUpdater(WriteStreamSubscriber.class, PublisherSource.Subscription.class, "subscription");
    private final CompletableSource.Subscriber subscriber;
    private final Channel channel;
    private final EventExecutor eventLoop;
    private final WriteDemandEstimator demandEstimator;
    private final AllWritesPromise promise;
    @Nullable
    private volatile PublisherSource.Subscription subscription;
    private volatile long requested;
    private boolean enqueueWrites;
    private final CloseHandler closeHandler;

    WriteStreamSubscriber(Channel channel, WriteDemandEstimator demandEstimator, CompletableSource.Subscriber subscriber, CloseHandler closeHandler, ConnectionObserver.WriteObserver observer, UnaryOperator<Throwable> enrichProtocolError) {
        this.eventLoop = Objects.requireNonNull(channel.eventLoop());
        this.subscriber = subscriber;
        this.channel = channel;
        this.demandEstimator = demandEstimator;
        this.promise = new AllWritesPromise(channel, observer, enrichProtocolError);
        this.closeHandler = closeHandler;
    }

    @Override
    public void onSubscribe(PublisherSource.Subscription s) {
        ConcurrentSubscription concurrentSubscription = ConcurrentSubscription.wrap(s);
        if (!subscriptionUpdater.compareAndSet(this, null, concurrentSubscription)) {
            s.cancel();
            return;
        }
        this.subscriber.onSubscribe(concurrentSubscription);
        if (this.eventLoop.inEventLoop()) {
            this.requestMoreIfRequired(concurrentSubscription);
        } else {
            this.eventLoop.execute(() -> this.requestMoreIfRequired(concurrentSubscription));
        }
    }

    @Override
    public void onNext(Object o) {
        requestedUpdater.decrementAndGet(this);
        if (!this.enqueueWrites && !this.eventLoop.inEventLoop()) {
            this.enqueueWrites = true;
        }
        if (this.enqueueWrites) {
            this.eventLoop.execute(() -> {
                this.doWrite(o);
                this.requestMoreIfRequired(this.subscription);
            });
        } else {
            this.doWrite(o);
            this.requestMoreIfRequired(this.subscription);
        }
    }

    void doWrite(Object msg) {
        if (this.promise.isWritable()) {
            long capacityBefore = this.channel.bytesBeforeUnwritable();
            this.promise.writeNext(msg);
            long capacityAfter = this.channel.bytesBeforeUnwritable();
            this.demandEstimator.onItemWrite(msg, capacityBefore, capacityAfter);
        }
    }

    @Override
    public void onError(Throwable cause) {
        Objects.requireNonNull(cause);
        if (this.enqueueWrites || !this.eventLoop.inEventLoop()) {
            this.eventLoop.execute(() -> this.promise.sourceTerminated(cause));
        } else {
            this.promise.sourceTerminated(cause);
        }
    }

    @Override
    public void onComplete() {
        if (this.enqueueWrites || !this.eventLoop.inEventLoop()) {
            this.eventLoop.execute(() -> this.promise.sourceTerminated(null));
        } else {
            this.promise.sourceTerminated(null);
        }
    }

    @Override
    public void channelWritable() {
        assert (this.eventLoop.inEventLoop());
        this.requestMoreIfRequired(this.subscription);
    }

    @Override
    public void channelOutboundClosed() {
        assert (this.eventLoop.inEventLoop());
        this.promise.sourceTerminated(null);
    }

    @Override
    public void channelClosed(Throwable closedException) {
        PublisherSource.Subscription oldVal = subscriptionUpdater.getAndSet(this, CANCELLED);
        if (this.eventLoop.inEventLoop()) {
            this.close0(oldVal, closedException);
        } else {
            this.eventLoop.execute(() -> this.close0(oldVal, closedException));
        }
    }

    private void close0(@Nullable PublisherSource.Subscription oldVal, Throwable closedException) {
        assert (this.eventLoop.inEventLoop());
        if (oldVal == null) {
            this.subscriber.onSubscribe(IGNORE_CANCEL);
        } else {
            oldVal.cancel();
        }
        this.promise.close(closedException);
    }

    @Override
    public void cancel() {
        PublisherSource.Subscription oldVal = subscriptionUpdater.getAndSet(this, CANCELLED);
        if (oldVal == null || oldVal == CANCELLED) {
            return;
        }
        if (this.eventLoop.inEventLoop()) {
            oldVal.cancel();
        } else {
            this.eventLoop.execute(oldVal::cancel);
        }
    }

    private void requestMoreIfRequired(@Nullable PublisherSource.Subscription subscription) {
        if (subscription == null || subscription == CANCELLED || !this.promise.isWritable()) {
            return;
        }
        long n = this.demandEstimator.estimateRequestN(this.channel.bytesBeforeUnwritable());
        if (n > 0L) {
            requestedUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtection);
            subscription.request(n);
        }
    }

    static final class AbortedFirstWrite
    extends Exception {
        AbortedFirstWrite(Throwable cause) {
            super(null, cause, false, false);
        }
    }

    private final class AllWritesPromise
    extends DefaultChannelPromise {
        private int activeWrites;
        private boolean written;
        private byte state;
        @Nullable
        private Throwable failureCause;
        private final Deque<GenericFutureListener<?>> listenersOnWriteBoundaries;
        private final ConnectionObserver.WriteObserver observer;
        private final UnaryOperator<Throwable> enrichProtocolError;

        AllWritesPromise(Channel channel, ConnectionObserver.WriteObserver observer, UnaryOperator<Throwable> enrichProtocolError) {
            super(channel);
            this.listenersOnWriteBoundaries = new ArrayDeque();
            this.observer = observer;
            this.enrichProtocolError = enrichProtocolError;
        }

        @Override
        public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            if (this.hasFlag((byte)8)) {
                return this;
            }
            this.listenersOnWriteBoundaries.addLast(listener);
            return this;
        }

        @Override
        @SafeVarargs
        public final ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>> ... listeners) {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            if (this.hasFlag((byte)8)) {
                return this;
            }
            for (GenericFutureListener<? extends Future<? super Void>> listener : listeners) {
                this.listenersOnWriteBoundaries.addLast(listener);
            }
            return this;
        }

        @Override
        public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            this.listenersOnWriteBoundaries.removeFirstOccurrence(listener);
            return this;
        }

        @Override
        @SafeVarargs
        public final ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>> ... listeners) {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            for (GenericFutureListener<? extends Future<? super Void>> listener : listeners) {
                this.listenersOnWriteBoundaries.removeFirstOccurrence(listener);
            }
            return this;
        }

        boolean isWritable() {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            return !this.hasAnyFlags((byte)2, (byte)8, (byte)1);
        }

        void writeNext(Object msg) {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            if (!this.written) {
                this.written = true;
            }
            ++this.activeWrites;
            this.listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY);
            WriteStreamSubscriber.this.channel.write(msg, this);
        }

        void sourceTerminated(@Nullable Throwable cause) {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            if (this.hasAnyFlags((byte)8, (byte)1)) {
                return;
            }
            this.failureCause = cause;
            this.setFlag((byte)1);
            if (this.activeWrites == 0) {
                try {
                    this.setFlag((byte)8);
                    this.terminateSubscriber(cause);
                }
                catch (Throwable t) {
                    this.tryFailureOrLog(t);
                    return;
                }
                super.trySuccess(null);
            }
        }

        void close(Throwable cause) {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            if (this.hasFlag((byte)2)) {
                return;
            }
            if (this.hasFlag((byte)8)) {
                this.setFlag((byte)2);
                WriteStreamSubscriber.this.closeHandler.closeChannelOutbound(WriteStreamSubscriber.this.channel);
            } else if (this.activeWrites > 0) {
                this.setFlag((byte)4);
            } else {
                this.setFlag((byte)2);
                this.tryFailure(!this.written ? new AbortedFirstWrite(cause) : cause);
            }
        }

        @Override
        public boolean trySuccess(Void result) {
            return this.setSuccess0();
        }

        @Override
        public boolean tryFailure(Throwable cause) {
            return this.setFailure0(cause);
        }

        @Override
        public ChannelPromise setSuccess(Void result) {
            this.setSuccess0();
            return this;
        }

        @Override
        public ChannelPromise setFailure(Throwable cause) {
            this.setFailure0(cause);
            return this;
        }

        private boolean setSuccess0() {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            if (this.hasFlag((byte)8)) {
                return this.nettySharedPromiseTryStatus();
            }
            this.observer.itemWritten();
            if (--this.activeWrites == 0 && this.hasFlag((byte)1)) {
                this.setFlag((byte)8);
                try {
                    this.terminateSubscriber(this.failureCause);
                }
                catch (Throwable t) {
                    this.tryFailureOrLog(t);
                    return true;
                }
                return super.trySuccess(null);
            }
            this.notifyListenersTillNextWrite(this.failureCause);
            return this.nettySharedPromiseTryStatus();
        }

        private boolean setFailure0(Throwable cause) {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            if (this.hasFlag((byte)8)) {
                return this.nettySharedPromiseTryStatus();
            }
            this.setFlag((byte)8);
            PublisherSource.Subscription oldVal = subscriptionUpdater.getAndSet(WriteStreamSubscriber.this, CANCELLED);
            if (oldVal != null && !this.hasFlag((byte)1)) {
                oldVal.cancel();
            }
            this.terminateSubscriber(cause);
            this.tryFailureOrLog(cause);
            return true;
        }

        private boolean nettySharedPromiseTryStatus() {
            return true;
        }

        private void terminateSubscriber(@Nullable Throwable cause) {
            if (cause == null) {
                try {
                    this.observer.writeComplete();
                    WriteStreamSubscriber.this.subscriber.onComplete();
                }
                catch (Throwable t) {
                    this.tryFailureOrLog(t);
                }
                if (this.hasFlag((byte)4)) {
                    WriteStreamSubscriber.this.closeHandler.closeChannelOutbound(WriteStreamSubscriber.this.channel);
                }
            } else {
                Throwable enrichedCause = (Throwable)this.enrichProtocolError.apply(cause);
                try {
                    this.observer.writeFailed(enrichedCause);
                    ChannelCloseUtils.assignConnectionError(WriteStreamSubscriber.this.channel, enrichedCause);
                    WriteStreamSubscriber.this.subscriber.onError(enrichedCause);
                }
                catch (Throwable t) {
                    this.tryFailureOrLog(t);
                }
                if (!this.hasFlag((byte)2)) {
                    ChannelCloseUtils.close(WriteStreamSubscriber.this.channel, enrichedCause);
                }
            }
            this.notifyAllListeners(cause);
        }

        private void notifyAllListeners(@Nullable Throwable cause) {
            GenericFutureListener<?> mayBeListener;
            ChannelFuture future;
            ChannelFuture channelFuture = future = cause == null ? WriteStreamSubscriber.this.channel.newSucceededFuture() : WriteStreamSubscriber.this.channel.newFailedFuture(cause);
            while ((mayBeListener = this.listenersOnWriteBoundaries.pollFirst()) != null) {
                if (mayBeListener == WRITE_BOUNDARY) continue;
                AllWritesPromise.notifyListener(WriteStreamSubscriber.this.eventLoop, future, mayBeListener);
            }
        }

        private void notifyListenersTillNextWrite(@Nullable Throwable cause) {
            ChannelFuture future;
            GenericFutureListener<?> shdBeWriteBoundary = this.listenersOnWriteBoundaries.pollFirst();
            assert (shdBeWriteBoundary == WRITE_BOUNDARY);
            ChannelFuture channelFuture = future = cause == null ? WriteStreamSubscriber.this.channel.newSucceededFuture() : WriteStreamSubscriber.this.channel.newFailedFuture(cause);
            while (!this.listenersOnWriteBoundaries.isEmpty() && this.listenersOnWriteBoundaries.peekFirst() != WRITE_BOUNDARY) {
                AllWritesPromise.notifyListener(WriteStreamSubscriber.this.eventLoop, future, this.listenersOnWriteBoundaries.pollFirst());
            }
        }

        private void tryFailureOrLog(Throwable cause) {
            if (!super.tryFailure(cause)) {
                LOGGER.error("Failed to set failure on the write promise {}.", (Object)this, (Object)cause);
            }
        }

        private boolean hasFlag(byte flag) {
            return (this.state & flag) == flag;
        }

        private boolean hasAnyFlags(byte flag1, byte flag2) {
            return (this.state & (flag1 | flag2)) > 0;
        }

        private boolean hasAnyFlags(byte flag1, byte flag2, byte flag3) {
            return (this.state & (flag1 | flag2 | flag3)) > 0;
        }

        private void setFlag(byte flag) {
            this.state = (byte)(this.state | flag);
        }
    }
}

