/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.transport.netty.internal;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.EmptySubscriptions;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.RetryableException;
import io.servicetalk.transport.netty.internal.ByteMaskUtils;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.ChannelOutboundListener;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.WriteDemandEstimator;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class WriteStreamSubscriber
implements PublisherSource.Subscriber<Object>,
ChannelOutboundListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(WriteStreamSubscriber.class);
    private static final GenericFutureListener WRITE_BOUNDARY = future -> {};
    private static final byte SOURCE_TERMINATED = 1;
    private static final byte CHANNEL_CLOSED = 2;
    private static final byte CLOSE_OUTBOUND_ON_SUBSCRIBER_TERMINATION = 4;
    private static final byte SUBSCRIBER_TERMINATED = 8;
    private static final byte SUBSCRIBER_OR_SOURCE_TERMINATED = 9;
    private static final PublisherSource.Subscription CANCELLED = EmptySubscriptions.newEmptySubscription();
    private static final AtomicReferenceFieldUpdater<WriteStreamSubscriber, PublisherSource.Subscription> subscriptionUpdater = AtomicReferenceFieldUpdater.newUpdater(WriteStreamSubscriber.class, PublisherSource.Subscription.class, "subscription");
    private final CompletableSource.Subscriber subscriber;
    private final Channel channel;
    private final EventExecutor eventLoop;
    private final WriteDemandEstimator demandEstimator;
    private final AllWritesPromise promise;
    @Nullable
    private volatile PublisherSource.Subscription subscription;
    private boolean enqueueWrites;
    private final CloseHandler closeHandler;
    private final ConnectionObserver.WriteObserver observer;
    private final boolean isClient;
    private final Predicate<Object> shouldWait;
    private boolean shouldWaitFlag;

    WriteStreamSubscriber(Channel channel, WriteDemandEstimator demandEstimator, CompletableSource.Subscriber subscriber, CloseHandler closeHandler, ConnectionObserver.WriteObserver observer, UnaryOperator<Throwable> enrichProtocolError, boolean isClient, Predicate<Object> shouldWait) {
        this.eventLoop = Objects.requireNonNull(channel.eventLoop());
        this.subscriber = subscriber;
        this.channel = channel;
        this.demandEstimator = demandEstimator;
        this.promise = new AllWritesPromise(channel, enrichProtocolError);
        this.closeHandler = closeHandler;
        this.observer = observer;
        this.isClient = isClient;
        this.shouldWait = Objects.requireNonNull(shouldWait);
    }

    @Override
    public void onSubscribe(PublisherSource.Subscription s) {
        ConcurrentSubscription concurrentSubscription = ConcurrentSubscription.wrap(s);
        if (!subscriptionUpdater.compareAndSet(this, null, concurrentSubscription)) {
            s.cancel();
            return;
        }
        this.subscriber.onSubscribe(concurrentSubscription);
        if (this.eventLoop.inEventLoop()) {
            this.initialRequestN(concurrentSubscription);
        } else {
            this.eventLoop.execute(() -> this.initialRequestN(concurrentSubscription));
        }
    }

    @Override
    public void onNext(Object o) {
        if (!this.enqueueWrites && !this.eventLoop.inEventLoop()) {
            this.enqueueWrites = true;
        }
        if (this.enqueueWrites) {
            this.eventLoop.execute(() -> this.doWrite(o));
        } else {
            this.doWrite(o);
        }
    }

    void doWrite(Object msg) {
        if (this.promise.isWritable()) {
            long capacityBefore = this.channel.bytesBeforeUnwritable();
            this.promise.writeNext(msg);
            long capacityAfter = this.channel.bytesBeforeUnwritable();
            this.observer.itemWritten(msg);
            this.demandEstimator.onItemWrite(msg, capacityBefore, capacityAfter);
            if (!this.isClient || !(this.shouldWaitFlag = this.shouldWait.test(msg))) {
                this.requestMoreIfRequired(this.subscription, capacityAfter);
            }
        }
    }

    @Override
    public void onError(Throwable cause) {
        Objects.requireNonNull(cause);
        if (this.enqueueWrites || !this.eventLoop.inEventLoop()) {
            this.scheduleSourceTerminated(cause);
        } else {
            this.promise.sourceTerminated(cause, true);
        }
    }

    @Override
    public void onComplete() {
        if (this.enqueueWrites || !this.eventLoop.inEventLoop()) {
            this.scheduleSourceTerminated(null);
        } else {
            this.promise.sourceTerminated(null, true);
        }
    }

    private void scheduleSourceTerminated(@Nullable Throwable cause) {
        this.subscription = CANCELLED;
        this.eventLoop.execute(() -> this.promise.sourceTerminated(cause, false));
    }

    @Override
    public void channelWritable() {
        assert (this.eventLoop.inEventLoop());
        PublisherSource.Subscription subscription = this.subscription;
        if (this.isClient && subscription != null && subscription != CANCELLED && !this.promise.written) {
            this.initialRequestN(subscription);
        } else {
            this.requestMoreIfRequired(subscription, -1L);
        }
    }

    @Override
    public void continueWriting() {
        assert (this.eventLoop.inEventLoop());
        if (this.shouldWaitFlag) {
            this.shouldWaitFlag = false;
            this.requestMoreIfRequired(this.subscription, -1L);
        }
    }

    @Override
    public void channelOutboundClosed() {
        assert (this.eventLoop.inEventLoop());
        PublisherSource.Subscription sub = this.subscription;
        if (sub != null) {
            sub.request(Long.MAX_VALUE);
        }
        this.promise.sourceTerminated(null, true);
    }

    @Override
    public void terminateSource() {
        assert (this.eventLoop.inEventLoop());
        if (this.shouldWaitFlag) {
            assert (this.promise.activeWrites == 0);
            this.promise.sourceTerminated(null, true);
        }
    }

    @Override
    public void channelClosed(Throwable closedException) {
        this.discard(closedException, true);
    }

    @Override
    public void listenerDiscard(Throwable cause) {
        this.discard(cause, false);
    }

    private void discard(Throwable cause, boolean closeOutboundIfIdle) {
        PublisherSource.Subscription oldVal = subscriptionUpdater.getAndSet(this, CANCELLED);
        if (this.eventLoop.inEventLoop()) {
            this.close0(oldVal, cause, closeOutboundIfIdle);
        } else {
            this.eventLoop.execute(() -> this.close0(oldVal, cause, closeOutboundIfIdle));
        }
    }

    private void close0(@Nullable PublisherSource.Subscription oldVal, Throwable closedException, boolean closeOutboundIfIdle) {
        assert (this.eventLoop.inEventLoop());
        if (oldVal == null) {
            this.subscriber.onSubscribe(Cancellable.IGNORE_CANCEL);
        } else {
            oldVal.cancel();
        }
        this.promise.close(closedException, closeOutboundIfIdle);
    }

    void cancel() {
        PublisherSource.Subscription oldVal = subscriptionUpdater.getAndSet(this, CANCELLED);
        if (oldVal == null || oldVal == CANCELLED) {
            return;
        }
        if (this.eventLoop.inEventLoop()) {
            oldVal.cancel();
        } else {
            this.eventLoop.execute(oldVal::cancel);
        }
    }

    private void initialRequestN(PublisherSource.Subscription subscription) {
        if (this.isClient) {
            if (this.promise.isWritable()) {
                subscription.request(1L);
            }
        } else {
            this.requestMoreIfRequired(subscription, -1L);
        }
    }

    private void requestMoreIfRequired(@Nullable PublisherSource.Subscription subscription, long bytesBeforeUnwritable) {
        if (subscription == null || subscription == CANCELLED || !this.promise.isWritable()) {
            return;
        }
        long n = this.demandEstimator.estimateRequestN(bytesBeforeUnwritable >= 0L ? bytesBeforeUnwritable : this.channel.bytesBeforeUnwritable());
        if (n > 0L) {
            subscription.request(n);
        }
    }

    static final class AbortedFirstWriteException
    extends IOException
    implements RetryableException {
        private static final long serialVersionUID = -5626706348233302247L;

        AbortedFirstWriteException(Throwable cause) {
            super(cause);
        }

        @Override
        public Throwable fillInStackTrace() {
            return this;
        }
    }

    private final class AllWritesPromise
    extends DefaultChannelPromise {
        private int activeWrites;
        private boolean written;
        private byte state;
        @Nullable
        private Throwable failureCause;
        private final Deque<GenericFutureListener<?>> listenersOnWriteBoundaries;
        private final UnaryOperator<Throwable> enrichProtocolError;

        AllWritesPromise(Channel channel, UnaryOperator<Throwable> enrichProtocolError) {
            super(channel);
            this.listenersOnWriteBoundaries = new ArrayDeque(4);
            this.enrichProtocolError = enrichProtocolError;
        }

        @Override
        public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            if (ByteMaskUtils.isAllSet(this.state, (byte)8)) {
                return this;
            }
            this.listenersOnWriteBoundaries.addLast(listener);
            return this;
        }

        @Override
        @SafeVarargs
        public final ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>> ... listeners) {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            if (ByteMaskUtils.isAllSet(this.state, (byte)8)) {
                return this;
            }
            for (GenericFutureListener<? extends Future<? super Void>> listener : listeners) {
                this.listenersOnWriteBoundaries.addLast(listener);
            }
            return this;
        }

        @Override
        public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            this.listenersOnWriteBoundaries.removeFirstOccurrence(listener);
            return this;
        }

        @Override
        @SafeVarargs
        public final ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>> ... listeners) {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            for (GenericFutureListener<? extends Future<? super Void>> listener : listeners) {
                this.listenersOnWriteBoundaries.removeFirstOccurrence(listener);
            }
            return this;
        }

        boolean isWritable() {
            assert (WriteStreamSubscriber.this.channel.eventLoop().inEventLoop());
            return this.state == 0;
        }

        void writeNext(Object msg) {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            ++this.activeWrites;
            this.listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY);
            WriteStreamSubscriber.this.channel.write(msg, this);
            if (!this.written) {
                this.written = true;
            }
        }

        void sourceTerminated(@Nullable Throwable cause, boolean markCancelled) {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            if (ByteMaskUtils.isAnySet(this.state, (byte)9)) {
                return;
            }
            this.failureCause = cause;
            this.state = ByteMaskUtils.set(this.state, (byte)1);
            if (markCancelled) {
                WriteStreamSubscriber.this.subscription = CANCELLED;
            }
            if (this.activeWrites == 0) {
                try {
                    this.state = ByteMaskUtils.set(this.state, (byte)8);
                    this.terminateSubscriber(cause);
                }
                catch (Throwable t) {
                    this.tryFailureOrLog(t);
                    return;
                }
                super.trySuccess(null);
            }
        }

        void close(Throwable cause, boolean closeOutboundIfIdle) {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            if (ByteMaskUtils.isAllSet(this.state, (byte)2)) {
                return;
            }
            if (ByteMaskUtils.isAllSet(this.state, (byte)8)) {
                this.state = ByteMaskUtils.set(this.state, (byte)2);
                if (closeOutboundIfIdle) {
                    WriteStreamSubscriber.this.closeHandler.closeChannelOutbound(WriteStreamSubscriber.this.channel);
                }
            } else if (this.activeWrites > 0) {
                this.state = ByteMaskUtils.set(this.state, (byte)4);
            } else {
                this.state = ByteMaskUtils.set(this.state, (byte)2);
                this.tryFailure(cause);
                if (closeOutboundIfIdle) {
                    WriteStreamSubscriber.this.closeHandler.closeChannelOutbound(WriteStreamSubscriber.this.channel);
                }
            }
        }

        @Override
        public boolean trySuccess(Void result) {
            return this.setSuccess0();
        }

        @Override
        public boolean tryFailure(Throwable cause) {
            return this.setFailure0(cause);
        }

        @Override
        public ChannelPromise setSuccess(Void result) {
            this.setSuccess0();
            return this;
        }

        @Override
        public ChannelPromise setFailure(Throwable cause) {
            this.setFailure0(cause);
            return this;
        }

        private boolean setSuccess0() {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            if (ByteMaskUtils.isAllSet(this.state, (byte)8)) {
                return this.nettySharedPromiseTryStatus();
            }
            WriteStreamSubscriber.this.observer.itemFlushed();
            if (--this.activeWrites == 0 && ByteMaskUtils.isAllSet(this.state, (byte)1)) {
                this.state = ByteMaskUtils.set(this.state, (byte)8);
                try {
                    this.terminateSubscriber(this.failureCause);
                }
                catch (Throwable t) {
                    this.tryFailureOrLog(t);
                    return true;
                }
                return super.trySuccess(null);
            }
            this.notifyListenersTillNextWrite(this.failureCause);
            return this.nettySharedPromiseTryStatus();
        }

        private boolean setFailure0(Throwable cause) {
            assert (WriteStreamSubscriber.this.eventLoop.inEventLoop());
            if (ByteMaskUtils.isAllSet(this.state, (byte)8)) {
                return this.nettySharedPromiseTryStatus();
            }
            this.state = ByteMaskUtils.set(this.state, (byte)8);
            PublisherSource.Subscription oldVal = subscriptionUpdater.getAndSet(WriteStreamSubscriber.this, CANCELLED);
            if (oldVal != null && !ByteMaskUtils.isAllSet(this.state, (byte)1)) {
                oldVal.cancel();
            }
            this.terminateSubscriber(cause);
            this.tryFailureOrLog(cause);
            return true;
        }

        private boolean nettySharedPromiseTryStatus() {
            return true;
        }

        private void terminateSubscriber(@Nullable Throwable cause) {
            if (cause == null) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("{} Terminate subscriber, state: {}", (Object)WriteStreamSubscriber.this.channel, (Object)Integer.toString(this.state, 2));
                }
                try {
                    WriteStreamSubscriber.this.observer.writeComplete();
                    WriteStreamSubscriber.this.subscriber.onComplete();
                }
                catch (Throwable t) {
                    this.tryFailureOrLog(t);
                }
                if (ByteMaskUtils.isAllSet(this.state, (byte)4)) {
                    WriteStreamSubscriber.this.closeHandler.closeChannelOutbound(WriteStreamSubscriber.this.channel);
                }
            } else {
                Throwable enrichedCause = (Throwable)this.enrichProtocolError.apply(cause);
                ChannelCloseUtils.assignConnectionError(WriteStreamSubscriber.this.channel, enrichedCause);
                Throwable throwable = enrichedCause = !this.written ? new AbortedFirstWriteException(enrichedCause) : enrichedCause;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("{} Terminate subscriber with an error, state: {}", WriteStreamSubscriber.this.channel, Integer.toString(this.state, 2), cause);
                }
                try {
                    WriteStreamSubscriber.this.observer.writeFailed(enrichedCause);
                    WriteStreamSubscriber.this.subscriber.onError(enrichedCause);
                }
                catch (Throwable t) {
                    ThrowableUtils.addSuppressed(t, enrichedCause);
                    this.tryFailureOrLog(t);
                }
                if (!ByteMaskUtils.isAllSet(this.state, (byte)2)) {
                    WriteStreamSubscriber.this.channel.close();
                }
            }
            this.notifyAllListeners(cause);
        }

        private void notifyAllListeners(@Nullable Throwable cause) {
            GenericFutureListener<?> mayBeListener;
            ChannelFuture future;
            ChannelFuture channelFuture = future = cause == null ? WriteStreamSubscriber.this.channel.newSucceededFuture() : WriteStreamSubscriber.this.channel.newFailedFuture(cause);
            while ((mayBeListener = this.listenersOnWriteBoundaries.pollFirst()) != null) {
                if (mayBeListener == WRITE_BOUNDARY) continue;
                AllWritesPromise.notifyListener(WriteStreamSubscriber.this.eventLoop, future, mayBeListener);
            }
        }

        private void notifyListenersTillNextWrite(@Nullable Throwable cause) {
            ChannelFuture future;
            GenericFutureListener<?> shdBeWriteBoundary = this.listenersOnWriteBoundaries.pollFirst();
            assert (shdBeWriteBoundary == WRITE_BOUNDARY);
            ChannelFuture channelFuture = future = cause == null ? WriteStreamSubscriber.this.channel.newSucceededFuture() : WriteStreamSubscriber.this.channel.newFailedFuture(cause);
            while (!this.listenersOnWriteBoundaries.isEmpty() && this.listenersOnWriteBoundaries.peekFirst() != WRITE_BOUNDARY) {
                AllWritesPromise.notifyListener(WriteStreamSubscriber.this.eventLoop, future, this.listenersOnWriteBoundaries.pollFirst());
            }
        }

        private void tryFailureOrLog(Throwable cause) {
            if (!super.tryFailure(cause)) {
                LOGGER.error("Failed to set failure on the write promise {}.", (Object)this, (Object)cause);
            }
        }
    }
}

