/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.channel;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.netty.ByteBufFlux;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.ChannelOperationsHandler;
import reactor.netty.channel.FluxReceive;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

public class ChannelOperations<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound>
extends DefaultPromise<Void>
implements NettyInbound,
NettyOutbound,
Connection,
CoreSubscriber<Void>,
ChannelPromise {
    final Connection connection;
    final FluxReceive inbound;
    final ConnectionObserver listener;
    volatile Subscription outboundSubscription;
    static final Logger log = Loggers.getLogger(ChannelOperations.class);
    static final Object TERMINATED_OPS = new Object();
    static final OnSetup EMPTY_SETUP = (c, l, msg) -> null;
    static final AtomicReferenceFieldUpdater<ChannelOperations, Subscription> OUTBOUND_CLOSE = AtomicReferenceFieldUpdater.newUpdater(ChannelOperations.class, Subscription.class, "outboundSubscription");

    public static void addReactiveBridge(Channel ch, OnSetup opsFactory, ConnectionObserver listener) {
        ch.pipeline().addLast("reactor.right.reactiveBridge", (ChannelHandler)new ChannelOperationsHandler(opsFactory, listener));
    }

    @Nullable
    public static ChannelOperations<?, ?> get(Channel ch) {
        return Connection.from(ch).as(ChannelOperations.class);
    }

    protected ChannelOperations(ChannelOperations<INBOUND, OUTBOUND> replaced) {
        this(replaced.connection, replaced.listener);
    }

    public ChannelOperations(Connection connection, ConnectionObserver listener) {
        super((EventExecutor)connection.channel().eventLoop());
        this.connection = Objects.requireNonNull(connection, "connection");
        this.listener = Objects.requireNonNull(listener, "listener");
        this.inbound = new FluxReceive(this);
    }

    @Override
    @Nullable
    public <T extends Connection> T as(Class<T> clazz) {
        if (clazz == ChannelOperations.class) {
            ChannelOperations thiz = this;
            return (T)thiz;
        }
        return Connection.super.as(clazz);
    }

    @Override
    public ByteBufAllocator alloc() {
        return this.connection.channel().alloc();
    }

    @Override
    public NettyInbound inbound() {
        return this;
    }

    @Override
    public NettyOutbound outbound() {
        return this;
    }

    @Override
    public final Channel channel() {
        return this.connection.channel();
    }

    @Override
    public ChannelOperations<INBOUND, OUTBOUND> withConnection(Consumer<? super Connection> withConnection) {
        withConnection.accept(this);
        return this;
    }

    @Override
    public void dispose() {
        if (this.inbound.isDisposed()) {
            return;
        }
        this.inbound.cancel();
        this.connection.dispose();
    }

    @Override
    public CoreSubscriber<Void> disposeSubscriber() {
        return this;
    }

    @Override
    public final boolean isDisposed() {
        return !this.channel().isActive() || ChannelOperations.get(this.channel()) != this;
    }

    @Override
    public final Mono<Void> onDispose() {
        return this.connection.onDispose();
    }

    @Override
    public Connection onDispose(Disposable onDispose) {
        this.connection.onDispose(onDispose);
        return this;
    }

    public final void onComplete() {
        Subscription s = OUTBOUND_CLOSE.getAndSet(this, Operators.cancelledSubscription());
        if (s == Operators.cancelledSubscription() || this.isDisposed()) {
            return;
        }
        this.onOutboundComplete();
    }

    public final void onError(Throwable t) {
        Subscription s = OUTBOUND_CLOSE.getAndSet(this, Operators.cancelledSubscription());
        if (s == Operators.cancelledSubscription() || this.isDisposed()) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(this.channel(), "An outbound error could not be processed"), t);
            }
            return;
        }
        this.onOutboundError(t);
    }

    public final void onNext(Void aVoid) {
    }

    public final void onSubscribe(Subscription s) {
        if (Operators.setOnce(OUTBOUND_CLOSE, (Object)this, (Subscription)s)) {
            s.request(Long.MAX_VALUE);
        }
    }

    @Override
    public Flux<?> receiveObject() {
        return this.inbound;
    }

    @Override
    public ByteBufFlux receive() {
        return ByteBufFlux.fromInbound(this.receiveObject(), this.connection.channel().alloc());
    }

    @Override
    public NettyOutbound sendObject(Publisher<?> dataStream) {
        return this.then((Publisher)FutureMono.disposableWriteAndFlush(this.connection.channel(), dataStream));
    }

    @Override
    public NettyOutbound sendObject(Object message) {
        return this.then((Publisher)FutureMono.deferFuture(() -> this.connection.channel().writeAndFlush(message)));
    }

    @Override
    public <S> NettyOutbound sendUsing(Callable<? extends S> sourceInput, BiFunction<? super Connection, ? super S, ?> mappedInput, Consumer<? super S> sourceCleanup) {
        Objects.requireNonNull(sourceInput, "sourceInput");
        Objects.requireNonNull(mappedInput, "mappedInput");
        Objects.requireNonNull(sourceCleanup, "sourceCleanup");
        return this.then((Publisher)Mono.using(sourceInput, s -> FutureMono.from(this.connection.channel().writeAndFlush(mappedInput.apply(this, (Object)s))), sourceCleanup));
    }

    @Override
    public final Mono<Void> onTerminate() {
        if (!this.isPersistent()) {
            return this.connection.onDispose();
        }
        return FutureMono.from(this).or(this.connection.onDispose());
    }

    public final ConnectionObserver listener() {
        return this.listener;
    }

    public String toString() {
        return "ChannelOperations{" + this.connection.toString() + "}";
    }

    public final boolean isInboundCancelled() {
        return this.inbound.isCancelled();
    }

    public final boolean isInboundDisposed() {
        return this.inbound.isDisposed();
    }

    protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
        this.inbound.onInboundNext(msg);
    }

    protected void onInboundCancel() {
    }

    protected void onInboundComplete() {
        this.inbound.onInboundComplete();
    }

    protected void onInboundClose() {
        this.terminate();
    }

    protected void onOutboundComplete() {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(this.channel(), "[{}] User Handler requesting close connection"), new Object[]{this.formatName()});
        }
        this.markPersistent(false);
        this.terminate();
    }

    protected void onOutboundError(Throwable err) {
        this.markPersistent(false);
        this.terminate();
    }

    protected final void terminate() {
        if (this.rebind(this.connection)) {
            if (log.isTraceEnabled()) {
                log.trace(ReactorNetty.format(this.channel(), "Disposing ChannelOperation from a channel"), (Throwable)new Exception("ChannelOperation terminal stack"));
            }
            Operators.terminate(OUTBOUND_CLOSE, (Object)this);
            this.listener.onStateChange(this, ConnectionObserver.State.DISCONNECTING);
            this.onInboundComplete();
            if (this.isPersistent()) {
                this.channel().writeAndFlush(TERMINATED_OPS, (ChannelPromise)this);
            } else {
                this.setSuccess(null);
            }
        }
    }

    protected final void discard() {
        this.inbound.discard();
    }

    protected final void onInboundError(Throwable err) {
        this.inbound.onInboundError(err);
    }

    protected final Connection connection() {
        return this.connection;
    }

    protected final String formatName() {
        return this.getClass().getSimpleName().replace("Operations", "");
    }

    public ChannelPromise setSuccess() {
        this.setSuccess(null);
        return this;
    }

    public ChannelPromise setSuccess(Void result) {
        super.setSuccess((Object)result);
        return this;
    }

    public boolean trySuccess() {
        return this.trySuccess(null);
    }

    public ChannelPromise unvoid() {
        return this;
    }

    public boolean isVoid() {
        return true;
    }

    public ChannelPromise setFailure(Throwable cause) {
        super.setFailure(cause);
        return this;
    }

    public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
        super.addListener(listener);
        return this;
    }

    public ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>> ... listeners) {
        super.addListeners(listeners);
        return this;
    }

    public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
        super.removeListener(listener);
        return this;
    }

    public ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>> ... listeners) {
        super.removeListeners(listeners);
        return this;
    }

    public ChannelPromise sync() throws InterruptedException {
        super.sync();
        return this;
    }

    public ChannelPromise await() throws InterruptedException {
        super.await();
        return this;
    }

    public ChannelPromise awaitUninterruptibly() {
        super.awaitUninterruptibly();
        return this;
    }

    public ChannelPromise syncUninterruptibly() {
        super.syncUninterruptibly();
        return this;
    }

    @Override
    public boolean isPersistent() {
        return this.connection.isPersistent();
    }

    public Context currentContext() {
        return this.listener.currentContext();
    }

    @FunctionalInterface
    public static interface OnSetup {
        public static OnSetup empty() {
            return EMPTY_SETUP;
        }

        @Nullable
        public ChannelOperations<?, ?> create(Connection var1, ConnectionObserver var2, @Nullable Object var3);
    }
}

