/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.pool.ChannelPool;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.Cancellation;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.MonoSink;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.channel.ChannelOperations;
import reactor.ipc.netty.channel.ChannelOperationsHandler;
import reactor.ipc.netty.channel.ClientContextHandler;
import reactor.ipc.netty.channel.ContextHandler;
import reactor.ipc.netty.options.ClientOptions;
import reactor.util.Logger;
import reactor.util.Loggers;

final class PooledClientContextHandler<CHANNEL extends Channel>
extends ContextHandler<CHANNEL>
implements GenericFutureListener<Future<CHANNEL>> {
    static final Logger log = Loggers.getLogger(PooledClientContextHandler.class);
    final ClientOptions clientOptions;
    final boolean secure;
    final ChannelPool pool;
    final DirectProcessor<Void> onReleaseEmitter;
    volatile boolean cancelled;
    Future<CHANNEL> f;

    PooledClientContextHandler(ChannelOperations.OnNew<CHANNEL> channelOpFactory, ClientOptions options, MonoSink<NettyContext> sink, LoggingHandler loggingHandler, boolean secure, ChannelPool pool) {
        super(channelOpFactory, options, sink, loggingHandler);
        this.clientOptions = options;
        this.secure = secure;
        this.pool = pool;
        this.onReleaseEmitter = DirectProcessor.create();
    }

    @Override
    public void fireContextActive(NettyContext context) {
        if (!this.fired) {
            this.fired = true;
            this.sink.success((Object)context);
        }
    }

    @Override
    public void setFuture(Future<?> future) {
        Objects.requireNonNull(future, "future");
        if (this.f != null) {
            future.cancel(true);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Acquiring existing channel from pool: {}", new Object[]{this.pool.toString()});
        }
        this.f = future;
        this.f.addListener((GenericFutureListener)this);
    }

    public void operationComplete(Future<CHANNEL> future) throws Exception {
        this.sink.setCancellation((Cancellation)this);
        if (future.isCancelled() || this.cancelled) {
            if (log.isDebugEnabled()) {
                log.debug("Cancelled {}", new Object[]{future.toString()});
            }
            return;
        }
        if (!future.isSuccess()) {
            if (future.cause() != null) {
                this.sink.error(future.cause());
            } else {
                this.sink.error((Throwable)new IOException("error while connecting to " + future.toString()));
            }
            return;
        }
        Channel c = (Channel)future.get();
        if (c.eventLoop().inEventLoop()) {
            this.connectOrAcquire(c);
        } else {
            c.eventLoop().execute(() -> this.connectOrAcquire(c));
        }
    }

    @Override
    protected Publisher<Void> onCloseOrRelease(Channel channel) {
        return this.onReleaseEmitter;
    }

    final void connectOrAcquire(CHANNEL c) {
        ChannelOperationsHandler handler = (ChannelOperationsHandler)c.pipeline().get("reactiveBridge");
        if (handler == null) {
            if (log.isDebugEnabled()) {
                log.debug("Connected new channel: {}", new Object[]{c.toString()});
            }
            this.doPipeline(c.pipeline());
            c.pipeline().addLast("bridgeSetup", (ChannelHandler)new ContextHandler.BridgeSetupHandler(this));
            if (c.isRegistered()) {
                c.pipeline().fireChannelRegistered();
            }
            if (c.isActive()) {
                c.pipeline().fireChannelActive();
                return;
            }
            c.pipeline().fireChannelInactive();
            if (!c.isRegistered()) {
                c.pipeline().fireChannelUnregistered();
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Acquired existing channel: {}", new Object[]{c.toString()});
            }
            c.pipeline().replace("reactiveBridge", "reactiveBridge", (ChannelHandler)new ChannelOperationsHandler(this));
        }
    }

    public void dispose() {
        if (this.f == null || this.cancelled) {
            return;
        }
        this.cancelled = true;
        if (!this.f.isDone()) {
            if (log.isDebugEnabled()) {
                log.debug("Releasing pending channel acquisition: {}", new Object[]{this.f.toString()});
            }
            this.f.addListener(ff -> {
                if (ff.isSuccess()) {
                    this.release((Channel)ff.get());
                }
            });
            return;
        }
        try {
            Channel c = (Channel)this.f.get();
            if (!c.isActive()) {
                this.release(c);
                return;
            }
            if (!c.eventLoop().inEventLoop()) {
                c.eventLoop().execute(() -> this.release(c));
            } else {
                this.release(c);
            }
        }
        catch (Exception e) {
            log.error("Failed releasing channel", (Throwable)e);
            this.onReleaseEmitter.onError((Throwable)e);
        }
    }

    final void release(CHANNEL c) {
        if (log.isDebugEnabled()) {
            log.debug("Releasing channel: {}", new Object[]{c.toString()});
        }
        this.pool.release(c).addListener(f -> {
            if (!c.isOpen()) {
                return;
            }
            Boolean attr = (Boolean)c.attr(CLOSE_CHANNEL).get();
            if (attr != null && attr.booleanValue()) {
                c.close();
            } else if (f.isSuccess()) {
                this.onReleaseEmitter.onComplete();
            } else {
                this.onReleaseEmitter.onError(f.cause());
            }
        });
    }

    @Override
    protected void doDropped(Channel channel) {
        this.dispose();
        this.fireContextError(ABORTED);
    }

    @Override
    protected void doPipeline(ChannelPipeline pipeline) {
        ClientContextHandler.addSslAndLogHandlers(this.clientOptions, (MonoSink<NettyContext>)this.sink, this.loggingHandler, this.secure, pipeline);
        ClientContextHandler.addProxyHandler(this.clientOptions, pipeline);
    }
}

