/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.NoopAddressResolverGroup;
import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.net.ssl.SSLException;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.common.ChannelBridge;
import reactor.ipc.netty.common.ColocatedEventLoopGroup;
import reactor.ipc.netty.common.DuplexSocket;
import reactor.ipc.netty.common.MonoChannelFuture;
import reactor.ipc.netty.common.NettyChannel;
import reactor.ipc.netty.common.NettyChannelHandler;
import reactor.ipc.netty.config.ClientOptions;
import reactor.ipc.netty.tcp.NettySslReader;
import reactor.ipc.netty.tcp.TcpChannel;
import reactor.ipc.netty.tcp.TcpServer;
import reactor.ipc.netty.util.NettyNativeDetector;
import reactor.util.Logger;
import reactor.util.Loggers;

public class TcpClient
extends DuplexSocket<ByteBuf, ByteBuf, NettyChannel>
implements ChannelBridge<TcpChannel> {
    public static final Function PING = o -> Flux.empty();
    final EventLoopGroup ioGroup;
    final ClientOptions options;
    final SslContext sslContext;
    final NettyNativeDetector channelAdapter;
    final InetSocketAddress connectAddress;
    protected static final Logger log = Loggers.getLogger(TcpClient.class);
    static final AtomicLong COUNTER = new AtomicLong();

    public static TcpClient create() {
        return TcpClient.create("127.0.0.1");
    }

    public static TcpClient create(String bindAddress) {
        return TcpClient.create(bindAddress, DEFAULT_PORT);
    }

    public static TcpClient create(int port) {
        return TcpClient.create("127.0.0.1", port);
    }

    public static TcpClient create(String bindAddress, int port) {
        return TcpClient.create(ClientOptions.to(bindAddress, port));
    }

    public static TcpClient create(ClientOptions options) {
        return new TcpClient(options);
    }

    protected TcpClient(ClientOptions options) {
        this.connectAddress = null == options.remoteAddress() ? new InetSocketAddress("127.0.0.1", 3000) : options.remoteAddress();
        this.options = options.toImmutable();
        if (options.ssl() != null) {
            try {
                this.sslContext = options.ssl().build();
                if (log.isDebugEnabled()) {
                    log.debug("Connecting with SSL enabled using context {}", new Object[]{this.sslContext.getClass().getSimpleName()});
                }
                this.channelAdapter = this.sslContext instanceof JdkSslContext ? NettyNativeDetector.force(false) : NettyNativeDetector.instance();
            }
            catch (SSLException ssle) {
                throw Exceptions.bubble((Throwable)ssle);
            }
        } else {
            this.sslContext = null;
            this.channelAdapter = NettyNativeDetector.instance();
        }
        if (null != options.eventLoopGroup()) {
            this.ioGroup = options.eventLoopGroup();
        } else {
            int ioThreadCount = TcpServer.DEFAULT_TCP_THREAD_COUNT;
            this.ioGroup = new ColocatedEventLoopGroup(this.channelAdapter.newEventLoopGroup(ioThreadCount, r -> {
                Thread t = new Thread(r, "reactor-tcp-client-io-" + COUNTER.incrementAndGet());
                t.setDaemon(options.daemon());
                return t;
            }));
        }
    }

    public InetSocketAddress getConnectAddress() {
        return this.connectAddress;
    }

    public String toString() {
        return "TcpClient:" + this.getConnectAddress().toString();
    }

    protected ClientOptions getOptions() {
        return this.options;
    }

    @Override
    protected Mono<Void> doStart(Function<? super NettyChannel, ? extends Publisher<Void>> handler) {
        return this.doStart(handler, this.getConnectAddress(), this, this.sslContext != null);
    }

    protected Mono<Void> doStart(Function<? super NettyChannel, ? extends Publisher<Void>> handler, InetSocketAddress address, ChannelBridge<? extends TcpChannel> channelBridge, boolean secure) {
        Function<? super NettyChannel, ? extends Publisher<Void>> targetHandler = null == handler ? PING : handler;
        Bootstrap _bootstrap = (Bootstrap)new Bootstrap().group(this.ioGroup);
        if (this.options.proxyType() != null) {
            _bootstrap.resolver((AddressResolverGroup)NoopAddressResolverGroup.INSTANCE);
        }
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)_bootstrap.channel(this.channelAdapter.getChannel(this.ioGroup))).option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT)).option(ChannelOption.SO_RCVBUF, (Object)this.options.rcvbuf())).option(ChannelOption.SO_SNDBUF, (Object)this.options.sndbuf())).option(ChannelOption.AUTO_READ, (Object)false)).option(ChannelOption.SO_KEEPALIVE, (Object)this.options.keepAlive())).option(ChannelOption.SO_LINGER, (Object)this.options.linger())).option(ChannelOption.TCP_NODELAY, (Object)this.options.tcpNoDelay())).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)Math.min(Integer.MAX_VALUE, this.options.timeoutMillis())));
        if (!secure) {
            _bootstrap.handler((ChannelHandler)new TcpClientChannelSetup(this, null, channelBridge, targetHandler));
            return MonoChannelFuture.from((Future)_bootstrap.connect((SocketAddress)address));
        }
        DirectProcessor p = DirectProcessor.create();
        _bootstrap.handler((ChannelHandler)new TcpClientChannelSetup(this, (DirectProcessor<Void>)p, channelBridge, targetHandler));
        return MonoChannelFuture.from((Future)_bootstrap.connect((SocketAddress)address)).flux().then((Publisher)p);
    }

    protected Class<?> logClass() {
        return TcpClient.class;
    }

    @Override
    protected Mono<Void> doShutdown() {
        if (this.getOptions() != null && this.getOptions().eventLoopGroup() != null) {
            return Mono.empty();
        }
        return MonoChannelFuture.from(this.ioGroup.shutdownGracefully());
    }

    protected void bindChannel(Function<? super NettyChannel, ? extends Publisher<Void>> handler, SocketChannel ch, ChannelBridge<? extends TcpChannel> channelBridge) throws Exception {
        ch.pipeline().addLast("reactiveBridge", new NettyChannelHandler<TcpChannel>(handler, channelBridge, (Channel)ch));
    }

    @Override
    protected boolean shouldFailOnStarted() {
        return false;
    }

    @Override
    public TcpChannel createChannelBridge(Channel ioChannel, Flux<Object> input, Object ... parameters) {
        return new TcpChannel(ioChannel, input);
    }

    static final class TcpClientChannelSetup
    extends ChannelInitializer<SocketChannel> {
        final TcpClient parent;
        final ChannelBridge<? extends TcpChannel> channelBridge;
        final DirectProcessor<Void> secureCallback;
        final Function<? super NettyChannel, ? extends Publisher<Void>> targetHandler;

        TcpClientChannelSetup(TcpClient parent, DirectProcessor<Void> secureCallback, ChannelBridge<? extends TcpChannel> channelBridge, Function<? super NettyChannel, ? extends Publisher<Void>> targetHandler) {
            this.parent = parent;
            this.secureCallback = secureCallback;
            this.channelBridge = channelBridge;
            this.targetHandler = targetHandler;
        }

        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if (this.secureCallback != null && null != this.parent.sslContext) {
                SslHandler sslHandler = this.parent.sslContext.newHandler(ch.alloc());
                sslHandler.setHandshakeTimeoutMillis(this.parent.options.sslHandshakeTimeoutMillis());
                if (log.isTraceEnabled()) {
                    pipeline.addFirst("sslLoggingHandler", (ChannelHandler)new LoggingHandler(this.parent.logClass()));
                    pipeline.addAfter("sslLoggingHandler", "sslHandler", (ChannelHandler)sslHandler);
                } else {
                    pipeline.addFirst("sslHandler", (ChannelHandler)sslHandler);
                }
                if (log.isDebugEnabled()) {
                    pipeline.addAfter("sslHandler", "loggingHandler", (ChannelHandler)new LoggingHandler(this.parent.logClass()));
                    pipeline.addAfter("loggingHandler", "sslReader", (ChannelHandler)new NettySslReader(this.secureCallback));
                } else {
                    pipeline.addAfter("sslHandler", "sslReader", (ChannelHandler)new NettySslReader(this.secureCallback));
                }
            } else if (log.isDebugEnabled()) {
                pipeline.addFirst("loggingHandler", (ChannelHandler)new LoggingHandler(this.parent.logClass()));
            }
            if (this.parent.options.proxyType() != null) {
                HttpProxyHandler proxy;
                InetSocketAddress proxyAddr = this.parent.options.proxyAddress().get();
                String username = this.parent.options.proxyUsername();
                String password = username != null && this.parent.options.proxyPassword() != null ? this.parent.options.proxyPassword().apply(username) : null;
                switch (this.parent.options.proxyType()) {
                    default: {
                        proxy = username != null && password != null ? new HttpProxyHandler((SocketAddress)proxyAddr, username, password) : new HttpProxyHandler((SocketAddress)proxyAddr);
                        break;
                    }
                    case SOCKS4: {
                        proxy = username != null ? new Socks4ProxyHandler((SocketAddress)proxyAddr, username) : new Socks4ProxyHandler((SocketAddress)proxyAddr);
                        break;
                    }
                    case SOCKS5: {
                        proxy = username != null && password != null ? new Socks5ProxyHandler((SocketAddress)proxyAddr, username, password) : new Socks5ProxyHandler((SocketAddress)proxyAddr);
                    }
                }
                pipeline.addFirst("proxyHandler", (ChannelHandler)proxy);
            }
            if (null != this.parent.options.pipelineConfigurer()) {
                this.parent.options.pipelineConfigurer().accept(pipeline);
            }
            this.parent.bindChannel(this.targetHandler, ch, this.channelBridge);
        }
    }
}

