/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.transport.netty.internal;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.ChannelOutputShutdownEvent;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Executors;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.concurrent.api.internal.SubscribableSingle;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.DefaultExecutionContext;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.api.RetryableException;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.ChannelOutboundListener;
import io.servicetalk.transport.netty.internal.ChannelSet;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.Flush;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.FlushStrategyHolder;
import io.servicetalk.transport.netty.internal.NettyChannelListenableAsyncCloseable;
import io.servicetalk.transport.netty.internal.NettyChannelPublisher;
import io.servicetalk.transport.netty.internal.NettyConnection;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import io.servicetalk.transport.netty.internal.NettyIoExecutors;
import io.servicetalk.transport.netty.internal.NettyPipelineSslUtils;
import io.servicetalk.transport.netty.internal.NoopTransportObserver;
import io.servicetalk.transport.netty.internal.RetryableClosedChannelException;
import io.servicetalk.transport.netty.internal.SocketOptionUtils;
import io.servicetalk.transport.netty.internal.StacklessClosedChannelException;
import io.servicetalk.transport.netty.internal.WriteDemandEstimator;
import io.servicetalk.transport.netty.internal.WriteDemandEstimators;
import io.servicetalk.transport.netty.internal.WriteStreamSubscriber;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DefaultNettyConnection<Read, Write>
extends NettyChannelListenableAsyncCloseable
implements NettyConnection<Read, Write>,
ChannelOutboundListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNettyConnection.class);
    private static final AtomicReferenceFieldUpdater<DefaultNettyConnection, ChannelOutboundListener> writableListenerUpdater = AtomicReferenceFieldUpdater.newUpdater(DefaultNettyConnection.class, ChannelOutboundListener.class, "channelOutboundListener");
    private final CloseHandler closeHandler;
    private final NettyChannelPublisher<Read> nettyChannelPublisher;
    private final Publisher<Read> readPublisher;
    private final ExecutionContext<?> executionContext;
    @Nullable
    private final CompletableSource.Processor onClosing;
    private final SingleSource.Processor<Throwable, Throwable> transportError = Processors.newSingleProcessor();
    private final FlushStrategyHolder flushStrategyHolder;
    @Nullable
    private final Long idleTimeoutMs;
    private final ConnectionInfo.Protocol protocol;
    private volatile ChannelOutboundListener channelOutboundListener = NoopChannelOutboundListener.access$000();
    @Nullable
    private volatile CloseHandler.CloseEvent closeReason;
    @Nullable
    private SSLSession sslSession;
    @Nullable
    private final ChannelConfig parentChannelConfig;
    private volatile ConnectionObserver.DataObserver dataObserver;
    private final boolean isClient;
    private final Predicate<Object> shouldWait;
    private final UnaryOperator<Throwable> enrichProtocolError;
    private final TerminalSignalConsumer cleanupStateConsumer = new TerminalSignalConsumer(){

        @Override
        public void onComplete() {
            this.cleanupOnWriteTerminated();
        }

        @Override
        public void onError(Throwable throwable) {
            this.cleanupOnWriteTerminated();
        }

        @Override
        public void cancel() {
            DefaultNettyConnection.this.channelOutboundListener = DefaultNettyConnection.this;
        }

        private void cleanupOnWriteTerminated() {
            DefaultNettyConnection.this.channelOutboundListener = NoopChannelOutboundListener.INSTANCE;
        }
    };

    private DefaultNettyConnection(Channel channel, ExecutionContext<?> executionContext, CloseHandler closeHandler, FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs, ConnectionInfo.Protocol protocol, @Nullable SSLSession sslSession, @Nullable ChannelConfig parentChannelConfig, ConnectionObserver.DataObserver dataObserver, boolean isClient, Predicate<Object> shouldWait, UnaryOperator<Throwable> enrichProtocolError) {
        super(channel, executionContext.executionStrategy().isCloseOffloaded() ? executionContext.executor() : Executors.immediate());
        this.nettyChannelPublisher = new NettyChannelPublisher(channel, closeHandler);
        this.readPublisher = this.registerReadObserver(this.nettyChannelPublisher.onErrorMap(this::enrichError));
        this.executionContext = executionContext;
        this.closeHandler = Objects.requireNonNull(closeHandler);
        this.flushStrategyHolder = new FlushStrategyHolder(flushStrategy);
        this.idleTimeoutMs = idleTimeoutMs;
        if (closeHandler != CloseHandler.UNSUPPORTED_PROTOCOL_CLOSE_HANDLER) {
            this.onClosing = Processors.newCompletableProcessor();
            closeHandler.registerEventHandler(channel, evt -> {
                assert (channel.eventLoop().inEventLoop());
                if (this.closeReason == null) {
                    this.closeReason = evt;
                    this.onClosing.onComplete();
                    this.transportError.onSuccess(evt.wrapError(null, channel));
                    LOGGER.debug("{} Emitted CloseEvent: {}", (Object)channel, evt);
                }
            });
            SourceAdapters.toSource(this.onCloseNoOffload()).subscribe(this.onClosing);
        } else {
            this.onClosing = null;
        }
        this.sslSession = sslSession;
        this.parentChannelConfig = parentChannelConfig;
        this.protocol = Objects.requireNonNull(protocol);
        this.dataObserver = dataObserver;
        this.isClient = isClient;
        this.shouldWait = Objects.requireNonNull(shouldWait);
        this.enrichProtocolError = Objects.requireNonNull(enrichProtocolError);
    }

    @Deprecated
    public static <Read, Write> DefaultNettyConnection<Read, Write> initChildChannel(Channel channel, ExecutionContext<?> executionContext, CloseHandler closeHandler, FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs, ConnectionInfo.Protocol protocol, @Nullable SSLSession sslSession, @Nullable ChannelConfig parentChannelConfig, ConnectionObserver.StreamObserver streamObserver, boolean isClient, UnaryOperator<Throwable> enrichProtocolError) {
        return DefaultNettyConnection.initChildChannel(channel, executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, sslSession, parentChannelConfig, streamObserver, isClient, __ -> false, enrichProtocolError);
    }

    public static <Read, Write> DefaultNettyConnection<Read, Write> initChildChannel(Channel channel, ExecutionContext<?> executionContext, CloseHandler closeHandler, FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs, ConnectionInfo.Protocol protocol, @Nullable SSLSession sslSession, @Nullable ChannelConfig parentChannelConfig, ConnectionObserver.StreamObserver streamObserver, boolean isClient, Predicate<Object> shouldWait, UnaryOperator<Throwable> enrichProtocolError) {
        DefaultExecutionContext childExecutionContext = new DefaultExecutionContext(executionContext.bufferAllocator(), NettyIoExecutors.fromNettyEventLoop(channel.eventLoop(), executionContext.ioExecutor().isIoThreadSupported()), executionContext.executor(), executionContext.executionStrategy());
        DefaultNettyConnection<Read, Write> connection = new DefaultNettyConnection<Read, Write>(channel, childExecutionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, sslSession, parentChannelConfig, streamObserver.streamEstablished(), isClient, shouldWait, enrichProtocolError);
        channel.pipeline().addLast(new NettyToStChannelInboundHandler<Read, Write>(connection, null, null, false, NoopTransportObserver.NoopConnectionObserver.INSTANCE));
        return connection;
    }

    @Deprecated
    public static <Read, Write> Single<DefaultNettyConnection<Read, Write>> initChannel(Channel channel, BufferAllocator allocator, Executor executor, @Nullable IoExecutor ioExecutor, CloseHandler closeHandler, FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs, ChannelInitializer initializer, ExecutionStrategy executionStrategy, ConnectionInfo.Protocol protocol, ConnectionObserver observer, boolean isClient) {
        return DefaultNettyConnection.initChannel(channel, allocator, executor, ioExecutor, closeHandler, flushStrategy, idleTimeoutMs, initializer, executionStrategy, protocol, observer, isClient, __ -> false);
    }

    public static <Read, Write> Single<DefaultNettyConnection<Read, Write>> initChannel(final Channel channel, final BufferAllocator allocator, final Executor executor, final @Nullable IoExecutor ioExecutor, final CloseHandler closeHandler, final FlushStrategy flushStrategy, final @Nullable Long idleTimeoutMs, final ChannelInitializer initializer, final ExecutionStrategy executionStrategy, final ConnectionInfo.Protocol protocol, final ConnectionObserver observer, final boolean isClient, final Predicate<Object> shouldWait) {
        return new SubscribableSingle<DefaultNettyConnection<Read, Write>>(){

            @Override
            protected void handleSubscribe(SingleSource.Subscriber<? super DefaultNettyConnection<Read, Write>> subscriber) {
                NettyToStChannelInboundHandler nettyInboundHandler;
                DelayedCancellable delayedCancellable;
                try {
                    delayedCancellable = new DelayedCancellable();
                    boolean supportsIoThread = null != ioExecutor && ioExecutor.isIoThreadSupported();
                    DefaultExecutionContext<ExecutionStrategy> executionContext = new DefaultExecutionContext<ExecutionStrategy>(allocator, NettyIoExecutors.fromNettyEventLoop(channel.eventLoop(), supportsIoThread), executor, executionStrategy);
                    DefaultNettyConnection connection = new DefaultNettyConnection(channel, executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, null, null, NoopTransportObserver.NoopDataObserver.INSTANCE, isClient, shouldWait, UnaryOperator.identity());
                    channel.attr(ChannelSet.CHANNEL_CLOSEABLE_KEY).set(connection);
                    initializer.init(channel);
                    ChannelPipeline pipeline = connection.channel().pipeline();
                    nettyInboundHandler = new NettyToStChannelInboundHandler(connection, subscriber, delayedCancellable, NettyPipelineSslUtils.isSslEnabled(pipeline), observer);
                }
                catch (Throwable cause) {
                    ChannelCloseUtils.close(channel, cause);
                    SubscriberUtils.deliverErrorFromSource(subscriber, cause);
                    return;
                }
                subscriber.onSubscribe(delayedCancellable);
                channel.pipeline().addLast(nettyInboundHandler);
            }
        };
    }

    private Publisher<Read> registerReadObserver(Publisher<Read> readPublisher) {
        return readPublisher.liftSync(target -> {
            ConnectionObserver.DataObserver dataObserver = this.dataObserver;
            if (dataObserver == NoopTransportObserver.NoopDataObserver.INSTANCE) {
                return target;
            }
            final ConnectionObserver.ReadObserver observer = dataObserver.onNewRead();
            return new PublisherSource.Subscriber<Read>(){

                @Override
                public void onSubscribe(final PublisherSource.Subscription subscription) {
                    target.onSubscribe(new PublisherSource.Subscription(){

                        @Override
                        public void request(long n) {
                            observer.requestedToRead(n);
                            subscription.request(n);
                        }

                        @Override
                        public void cancel() {
                            observer.readCancelled();
                            subscription.cancel();
                        }
                    });
                }

                @Override
                public void onNext(@Nullable Read read) {
                    observer.itemRead(read);
                    target.onNext(read);
                }

                @Override
                public void onError(Throwable t) {
                    observer.readFailed(t);
                    target.onError(t);
                }

                @Override
                public void onComplete() {
                    observer.readComplete();
                    target.onComplete();
                }
            };
        });
    }

    private Throwable enrichError(Throwable t) {
        CloseHandler.CloseEvent closeReason;
        Throwable throwable = t instanceof WriteStreamSubscriber.AbortedFirstWriteException ? ((closeReason = this.closeReason) != null ? new RetryableClosedChannelException(this.wrapWithCloseReason(closeReason, t.getCause())) : (t.getCause() instanceof RetryableException ? t.getCause() : (t.getCause() instanceof ClosedChannelException ? new RetryableClosedChannelException((ClosedChannelException)t.getCause()) : t))) : (t instanceof RetryableClosedChannelException ? t : ((closeReason = this.closeReason) != null ? this.wrapWithCloseReason(closeReason, t) : (Throwable)this.enrichProtocolError.apply(t)));
        this.transportError.onSuccess(throwable);
        return throwable;
    }

    private ClosedChannelException wrapWithCloseReason(CloseHandler.CloseEvent closeReason, Throwable t) {
        if (t instanceof CloseHandler.CloseEventObservedException && ((CloseHandler.CloseEventObservedException)t).event() == closeReason) {
            return (ClosedChannelException)t;
        }
        return closeReason.wrapError(t, this.channel());
    }

    @Override
    public Publisher<Read> read() {
        return this.readPublisher;
    }

    @Override
    public Completable write(Publisher<Write> write) {
        return this.write(write, this.flushStrategyHolder::currentStrategy, WriteDemandEstimators::newDefaultEstimator);
    }

    @Override
    public Completable write(final Publisher<Write> write, final Supplier<FlushStrategy> flushStrategySupplier, final Supplier<WriteDemandEstimator> demandEstimatorSupplier) {
        return this.cleanupStateWhenDone(new SubscribableCompletable(){

            @Override
            protected void handleSubscribe(CompletableSource.Subscriber completableSubscriber) {
                ConnectionObserver.WriteObserver writeObserver = DefaultNettyConnection.this.dataObserver.onNewWrite();
                WriteStreamSubscriber subscriber = new WriteStreamSubscriber(DefaultNettyConnection.this.channel(), (WriteDemandEstimator)demandEstimatorSupplier.get(), completableSubscriber, DefaultNettyConnection.this.closeHandler, writeObserver, DefaultNettyConnection.this.enrichProtocolError, DefaultNettyConnection.this.isClient, DefaultNettyConnection.this.shouldWait);
                if (DefaultNettyConnection.this.failIfWriteActive(subscriber, completableSubscriber)) {
                    SourceAdapters.toSource(Flush.composeFlushes(DefaultNettyConnection.this.channel(), write, (FlushStrategy)flushStrategySupplier.get(), writeObserver)).subscribe(subscriber);
                }
            }
        }).onErrorMap(this::enrichError);
    }

    boolean isWriteActive() {
        ChannelOutboundListener listener = this.channelOutboundListener;
        return listener != NoopChannelOutboundListener.INSTANCE && listener != this;
    }

    @Override
    protected void doCloseAsyncGracefully() {
        EventLoop eventLoop = this.channel().eventLoop();
        if (eventLoop.inEventLoop()) {
            this.invokeUserCloseHandler();
        } else {
            eventLoop.execute(this::invokeUserCloseHandler);
        }
    }

    @Override
    public SocketAddress localAddress() {
        return this.channel().localAddress();
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.channel().remoteAddress();
    }

    @Override
    public SSLSession sslSession() {
        return this.sslSession;
    }

    @Override
    public ExecutionContext<?> executionContext() {
        return this.executionContext;
    }

    @Override
    @Nullable
    public <T> T socketOption(SocketOption<T> option) {
        return SocketOptionUtils.getOption(option, this.parentChannelConfig != null ? this.parentChannelConfig : this.channel().config(), this.idleTimeoutMs);
    }

    @Override
    public ConnectionInfo.Protocol protocol() {
        return this.protocol;
    }

    private void invokeUserCloseHandler() {
        this.closeHandler.gracefulUserClosing(this.channel());
    }

    @Override
    public Completable onClosing() {
        return this.onClosing == null ? this.onClose() : SourceAdapters.fromSource(this.onClosing);
    }

    @Override
    public Channel nettyChannel() {
        return this.channel();
    }

    public String toString() {
        return this.channel().toString();
    }

    private Completable cleanupStateWhenDone(Completable completable) {
        return completable.beforeFinally(this.cleanupStateConsumer);
    }

    @Override
    public void channelWritable() {
    }

    @Override
    public void continueWriting() {
    }

    @Override
    public void channelOutboundClosed() {
    }

    @Override
    public void terminateSource() {
    }

    @Override
    public void channelClosed(Throwable closedException) {
        this.closeHandler.closeChannelOutbound(this.channel());
    }

    @Override
    public void listenerDiscard(Throwable cause) {
    }

    private boolean failIfWriteActive(ChannelOutboundListener newListener, CompletableSource.Subscriber subscriber) {
        ChannelOutboundListener listener;
        do {
            if ((listener = this.channelOutboundListener) == NoopChannelOutboundListener.INSTANCE || listener == this) continue;
            SubscriberUtils.deliverErrorFromSource(subscriber, (Throwable)new IllegalStateException("A write is already active on this connection."));
            return false;
        } while (!writableListenerUpdater.compareAndSet(this, listener, newListener));
        CloseHandler.CloseEvent closeReason = this.closeReason;
        boolean channelActive = true;
        if (this.isClient && closeReason != null || !(channelActive = this.channel().isActive())) {
            StacklessClosedChannelException cause;
            StacklessClosedChannelException e = StacklessClosedChannelException.newInstance(DefaultNettyConnection.class, "failIfWriteActive(...)");
            ClosedChannelException closedChannelException = cause = closeReason == null ? e : closeReason.wrapError(e, this.channel());
            if (channelActive) {
                newListener.listenerDiscard(cause);
            } else {
                newListener.channelClosed(cause);
            }
            return false;
        }
        return true;
    }

    @Override
    public Cancellable updateFlushStrategy(NettyConnectionContext.FlushStrategyProvider strategyProvider) {
        return this.flushStrategyHolder.updateFlushStrategy(strategyProvider);
    }

    @Override
    public FlushStrategy defaultFlushStrategy() {
        return this.flushStrategyHolder.currentStrategy();
    }

    @Override
    public Single<Throwable> transportError() {
        return SourceAdapters.fromSource(this.transportError);
    }

    public static final class CancelWriteUserEvent {
        public static final CancelWriteUserEvent INSTANCE = new CancelWriteUserEvent();

        private CancelWriteUserEvent() {
        }

        public String toString() {
            return this.getClass().getName();
        }
    }

    public static final class ContinueUserEvent {
        public static final ContinueUserEvent INSTANCE = new ContinueUserEvent();

        private ContinueUserEvent() {
        }

        public String toString() {
            return this.getClass().getName();
        }
    }

    private static final class NettyToStChannelInboundHandler<Read, Write>
    implements ChannelInboundHandler {
        private final DefaultNettyConnection<Read, Write> connection;
        private final boolean waitForSslHandshake;
        @Nullable
        private final DelayedCancellable delayedCancellable;
        @Nullable
        private SingleSource.Subscriber<? super DefaultNettyConnection<Read, Write>> subscriber;
        private final ConnectionObserver observer;

        NettyToStChannelInboundHandler(DefaultNettyConnection<Read, Write> connection, @Nullable SingleSource.Subscriber<? super DefaultNettyConnection<Read, Write>> subscriber, @Nullable DelayedCancellable delayedCancellable, boolean waitForSslHandshake, ConnectionObserver observer) {
            this.connection = connection;
            this.subscriber = subscriber;
            this.delayedCancellable = delayedCancellable;
            this.waitForSslHandshake = waitForSslHandshake;
            this.observer = observer;
        }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            if (ctx.channel().isWritable()) {
                ((DefaultNettyConnection)this.connection).channelOutboundListener.channelWritable();
            } else if (((DefaultNettyConnection)this.connection).flushStrategyHolder.currentStrategy().shouldFlushOnUnwritable()) {
                ctx.flush();
            }
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            if (this.delayedCancellable != null) {
                this.delayedCancellable.delayedCancellable(ctx.channel()::close);
            }
            if (ctx.channel().isActive()) {
                this.doChannelActive(ctx);
            }
        }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            if (this.subscriber != null) {
                this.tryFailSubscriber(StacklessClosedChannelException.newInstance(DefaultNettyConnection.class, "handlerRemoved(...)"));
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ((DefaultNettyConnection)this.connection).nettyChannelPublisher.channelOnError(NettyToStChannelInboundHandler.unwrapThrowable(cause));
        }

        private static Throwable unwrapThrowable(Throwable t) {
            Throwable cause;
            if (t instanceof DecoderException && (cause = t.getCause()) instanceof SSLException) {
                return cause;
            }
            return t;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            Object t = msg;
            ((DefaultNettyConnection)this.connection).nettyChannelPublisher.channelRead(t);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ((DefaultNettyConnection)this.connection).nettyChannelPublisher.onReadComplete();
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            LOGGER.debug("{} Received a user event: {}", (Object)ctx.channel(), evt);
            if (evt == CloseHandler.InboundDataEndEvent.INSTANCE) {
                ((DefaultNettyConnection)this.connection).nettyChannelPublisher.channelOnComplete();
            } else if (evt == CloseHandler.OutboundDataEndEvent.INSTANCE) {
                ((DefaultNettyConnection)this.connection).channelOutboundListener.channelOutboundClosed();
            } else if (evt == CloseHandler.AbortWritesEvent.INSTANCE) {
                ((DefaultNettyConnection)this.connection).channelOutboundListener.channelClosed(StacklessClosedChannelException.newInstance(DefaultNettyConnection.class, "userEventTriggered(AbortWritesEvent)"));
            } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
                ((DefaultNettyConnection)this.connection).closeHandler.channelClosedOutbound(ctx);
                ((DefaultNettyConnection)this.connection).channelOutboundListener.channelClosed(StacklessClosedChannelException.newInstance(DefaultNettyConnection.class, "userEventTriggered(ChannelOutputShutdownEvent)"));
            } else if (evt == SslCloseCompletionEvent.SUCCESS) {
                ((DefaultNettyConnection)this.connection).closeHandler.channelCloseNotify(ctx);
            } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
                ((DefaultNettyConnection)this.connection).closeHandler.channelClosedInbound(ctx);
                ((DefaultNettyConnection)this.connection).nettyChannelPublisher.channelOnError(StacklessClosedChannelException.newInstance(DefaultNettyConnection.class, "userEventTriggered(ChannelInputShutdownReadComplete)"));
            } else if (evt instanceof SslHandshakeCompletionEvent) {
                ((DefaultNettyConnection)this.connection).sslSession = NettyPipelineSslUtils.extractSslSessionAndReport(ctx.pipeline(), (SslHandshakeCompletionEvent)evt, this::tryFailSubscriber, this.observer != NoopTransportObserver.NoopConnectionObserver.INSTANCE);
                if (this.subscriber != null) {
                    assert (this.waitForSslHandshake);
                    this.completeSubscriber();
                }
            } else if (evt == ContinueUserEvent.INSTANCE) {
                assert (((DefaultNettyConnection)this.connection).isClient);
                ((DefaultNettyConnection)this.connection).channelOutboundListener.continueWriting();
            } else if (evt == CancelWriteUserEvent.INSTANCE) {
                assert (((DefaultNettyConnection)this.connection).isClient);
                ((DefaultNettyConnection)this.connection).channelOutboundListener.terminateSource();
            }
            ctx.fireUserEventTriggered(evt);
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) {
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            this.doChannelActive(ctx);
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            StacklessClosedChannelException closedChannelException = StacklessClosedChannelException.newInstance(DefaultNettyConnection.class, "channelInactive(...)");
            this.tryFailSubscriber(closedChannelException);
            ((DefaultNettyConnection)this.connection).channelOutboundListener.channelClosed(closedChannelException);
            ((DefaultNettyConnection)this.connection).nettyChannelPublisher.channelOnError(closedChannelException);
        }

        private void doChannelActive(ChannelHandlerContext ctx) {
            if (this.waitForSslHandshake) {
                ctx.read();
            } else if (this.subscriber != null) {
                this.completeSubscriber();
            }
        }

        private void completeSubscriber() {
            assert (this.subscriber != null);
            SingleSource.Subscriber<DefaultNettyConnection<Read, Write>> subscriberCopy = this.subscriber;
            this.subscriber = null;
            ((DefaultNettyConnection)this.connection).dataObserver = this.observer.connectionEstablished(this.connection);
            subscriberCopy.onSuccess(this.connection);
        }

        private void tryFailSubscriber(Throwable cause) {
            if (this.subscriber != null) {
                ChannelCloseUtils.close(this.connection.channel(), cause);
                SingleSource.Subscriber<DefaultNettyConnection<Read, Write>> subscriberCopy = this.subscriber;
                this.subscriber = null;
                subscriberCopy.onError(cause);
            }
        }
    }

    private static final class NoopChannelOutboundListener
    implements ChannelOutboundListener {
        private static final ChannelOutboundListener INSTANCE = new NoopChannelOutboundListener();

        private NoopChannelOutboundListener() {
        }

        @Override
        public void channelWritable() {
        }

        @Override
        public void continueWriting() {
        }

        @Override
        public void channelOutboundClosed() {
        }

        @Override
        public void terminateSource() {
        }

        @Override
        public void channelClosed(Throwable closedException) {
        }

        @Override
        public void listenerDiscard(Throwable cause) {
        }
    }
}

