/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.net.ssl.SSLException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Exceptions;
import reactor.core.MultiProducer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.ipc.netty.common.ChannelBridge;
import reactor.ipc.netty.common.DuplexSocket;
import reactor.ipc.netty.common.MonoChannelFuture;
import reactor.ipc.netty.common.NettyChannel;
import reactor.ipc.netty.common.NettyChannelHandler;
import reactor.ipc.netty.config.ServerOptions;
import reactor.ipc.netty.tcp.TcpChannel;
import reactor.ipc.netty.util.NettyNativeDetector;
import reactor.util.Logger;
import reactor.util.Loggers;

public class TcpServer
extends DuplexSocket<ByteBuf, ByteBuf, NettyChannel>
implements MultiProducer,
ChannelBridge<TcpChannel> {
    public static final int DEFAULT_TCP_THREAD_COUNT = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount", "" + Schedulers.DEFAULT_POOL_SIZE / 2));
    public static final int DEFAULT_TCP_SELECT_COUNT = Integer.parseInt(System.getProperty("reactor.tcp.selectThreadCount", "" + DEFAULT_TCP_THREAD_COUNT));
    final ServerBootstrap bootstrap;
    final EventLoopGroup selectorGroup;
    final EventLoopGroup ioGroup;
    final ChannelGroup channelGroup;
    final ServerOptions options;
    final SslContext sslContext;
    InetSocketAddress listenAddress;
    ChannelFuture bindFuture;
    static final Logger log = Loggers.getLogger(TcpServer.class);
    static final AtomicLong COUNTER = new AtomicLong();

    public static TcpServer create() {
        return TcpServer.create("127.0.0.1");
    }

    public static TcpServer create(ServerOptions options) {
        return new TcpServer(options);
    }

    public static TcpServer create(int port) {
        return TcpServer.create("127.0.0.1", port);
    }

    public static TcpServer create(String bindAddress) {
        return TcpServer.create(bindAddress, 0);
    }

    public static TcpServer create(String bindAddress, int port) {
        return TcpServer.create(ServerOptions.create().listen(bindAddress, port));
    }

    protected TcpServer(ServerOptions options) {
        NettyNativeDetector channelAdapter;
        this.listenAddress = options.listenAddress();
        this.options = options.toImmutable();
        int selectThreadCount = DEFAULT_TCP_SELECT_COUNT;
        int ioThreadCount = DEFAULT_TCP_THREAD_COUNT;
        if (options.ssl() != null) {
            try {
                this.sslContext = options.ssl().build();
                if (log.isDebugEnabled()) {
                    log.debug("Serving SSL enabled using context {}", new Object[]{this.sslContext.getClass().getSimpleName()});
                }
                channelAdapter = this.sslContext instanceof JdkSslContext ? NettyNativeDetector.force(false) : NettyNativeDetector.instance();
            }
            catch (SSLException e) {
                throw Exceptions.bubble((Throwable)e);
            }
        } else {
            this.sslContext = null;
            channelAdapter = NettyNativeDetector.instance();
        }
        this.selectorGroup = channelAdapter.newEventLoopGroup(selectThreadCount, r -> {
            Thread t = new Thread(r, "reactor-tcp-server-select-" + COUNTER.incrementAndGet());
            t.setDaemon(options.daemon());
            return t;
        });
        this.ioGroup = null != options.eventLoopGroup() ? options.eventLoopGroup() : channelAdapter.newEventLoopGroup(ioThreadCount, r -> {
            Thread t = new Thread(r, "reactor-tcp-server-io-" + COUNTER.incrementAndGet());
            t.setDaemon(options.daemon());
            return t;
        });
        ServerBootstrap _serverBootstrap = (ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().group(this.selectorGroup, this.ioGroup).channel(channelAdapter.getServerChannel(this.ioGroup))).option(ChannelOption.SO_BACKLOG, (Object)options.backlog())).childOption(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.SO_RCVBUF, (Object)options.rcvbuf()).childOption(ChannelOption.SO_SNDBUF, (Object)options.sndbuf()).childOption(ChannelOption.AUTO_READ, (Object)false).childOption(ChannelOption.SO_KEEPALIVE, (Object)options.keepAlive()).childOption(ChannelOption.SO_LINGER, (Object)options.linger()).childOption(ChannelOption.TCP_NODELAY, (Object)options.tcpNoDelay()).option(ChannelOption.SO_REUSEADDR, (Object)options.reuseAddr())).childOption(ChannelOption.SO_REUSEADDR, (Object)options.reuseAddr()).childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)Math.min(Integer.MAX_VALUE, options.timeoutMillis()))).localAddress((SocketAddress)(null == this.listenAddress ? new InetSocketAddress(0) : this.listenAddress));
        if (options.managed()) {
            log.debug("Server is managed.");
            this.channelGroup = new DefaultChannelGroup(null);
        } else {
            log.debug("Server is not managed (Not directly introspectable)");
            this.channelGroup = null;
        }
        this.bootstrap = _serverBootstrap;
    }

    @Override
    public Mono<Void> doShutdown() {
        try {
            this.bindFuture.channel().close().sync();
        }
        catch (InterruptedException ie) {
            return Mono.error((Throwable)ie);
        }
        Mono<Void> shutdown = MonoChannelFuture.from(this.selectorGroup.shutdownGracefully());
        if (null == this.getOptions() || null == this.getOptions().eventLoopGroup()) {
            return shutdown.then(aVoid -> MonoChannelFuture.from(this.ioGroup.shutdownGracefully()));
        }
        return shutdown;
    }

    public long downstreamCount() {
        return this.channelGroup == null ? -1L : (long)this.channelGroup.size();
    }

    public Iterator<?> downstreams() {
        if (this.channelGroup == null) {
            return null;
        }
        return new Iterator<Object>(){
            final Iterator<Channel> channelIterator;
            {
                this.channelIterator = TcpServer.this.channelGroup.iterator();
            }

            @Override
            public boolean hasNext() {
                return this.channelIterator.hasNext();
            }

            @Override
            public Object next() {
                return this.channelIterator.next().pipeline().get(NettyChannelHandler.class);
            }
        };
    }

    public InetSocketAddress getListenAddress() {
        return this.listenAddress;
    }

    protected ServerOptions getOptions() {
        return this.options;
    }

    public String toString() {
        return "TcpServer:" + this.getListenAddress().toString();
    }

    @Override
    protected Mono<Void> doStart(final Function<? super NettyChannel, ? extends Publisher<Void>> handler) {
        this.bootstrap.childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) throws Exception {
                if (log.isDebugEnabled()) {
                    log.debug("CONNECT {}", new Object[]{ch});
                }
                if (TcpServer.this.channelGroup != null) {
                    TcpServer.this.channelGroup.add((Object)ch);
                }
                TcpServer.this.bindChannel(handler, ch);
            }
        });
        this.bindFuture = this.bootstrap.bind();
        return new MonoChannelFuture<ChannelFuture>(this.bindFuture){

            @Override
            protected void doComplete(ChannelFuture future, Subscriber<? super Void> s) {
                if (log.isInfoEnabled()) {
                    log.info("BIND {} {}", new Object[]{future.isSuccess() ? "OK" : "FAILED", future.channel().localAddress()});
                }
                if (TcpServer.this.listenAddress.getPort() == 0) {
                    TcpServer.this.listenAddress = (InetSocketAddress)future.channel().localAddress();
                }
                super.doComplete(future, s);
            }
        };
    }

    protected SslContext getSslContext() {
        return this.sslContext;
    }

    @Override
    public TcpChannel createChannelBridge(Channel ioChannel, Flux<Object> input, Object ... parameters) {
        return new TcpChannel(ioChannel, input);
    }

    protected void bindChannel(Function<? super NettyChannel, ? extends Publisher<Void>> handler, SocketChannel nativeChannel) {
        ChannelPipeline pipeline = nativeChannel.pipeline();
        if (this.sslContext != null) {
            SslHandler sslHandler = this.sslContext.newHandler(nativeChannel.alloc());
            sslHandler.setHandshakeTimeoutMillis(this.options.sslHandshakeTimeoutMillis());
            pipeline.addFirst("sslHandler", (ChannelHandler)sslHandler);
        }
        if (null != this.getOptions() && null != this.getOptions().pipelineConfigurer()) {
            this.getOptions().pipelineConfigurer().accept(pipeline);
        }
        if (log.isDebugEnabled()) {
            pipeline.addLast("loggingHandler", (ChannelHandler)new LoggingHandler(TcpServer.class));
        }
        pipeline.addLast("reactiveBridge", new NettyChannelHandler<TcpChannel>(handler, this, (Channel)nativeChannel));
    }
}

