/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.network.netty;

import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.shuffle.config.ShuffleConfig;
import com.antgroup.geaflow.shuffle.network.ITransportContext;
import com.antgroup.geaflow.shuffle.network.NettyUtils;
import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);
    private static final String CLIENT_THREAD_GROUP_NAME = "NettyClient";
    private final ShuffleConfig config;
    private final PooledByteBufAllocator allocator;
    private Bootstrap bootstrap;

    public NettyClient(ShuffleConfig config, final ITransportContext context) {
        int receiveBufferSize;
        this.config = config;
        this.bootstrap = new Bootstrap();
        long start = System.nanoTime();
        if (Epoll.isAvailable()) {
            this.initEpollBootstrap();
            LOGGER.info("Transport type 'auto': using EPOLL.");
        } else {
            this.initNioBootstrap();
            LOGGER.info("Transport type 'auto': using NIO.");
        }
        this.bootstrap.option(ChannelOption.TCP_NODELAY, (Object)true);
        this.bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)config.getConnectTimeoutMs());
        this.allocator = NettyUtils.createPooledByteBufAllocator(config.preferDirectBuffer(), config.isThreadCacheEnabled(), config.getClientNumThreads());
        this.bootstrap.option(ChannelOption.ALLOCATOR, (Object)this.allocator);
        int sendBufferSize = config.getSendBufferSize();
        if (sendBufferSize > 0) {
            this.bootstrap.option(ChannelOption.SO_SNDBUF, (Object)sendBufferSize);
        }
        if ((receiveBufferSize = config.getReceiveBufferSize()) > 0) {
            this.bootstrap.option(ChannelOption.SO_RCVBUF, (Object)receiveBufferSize);
        }
        this.bootstrap.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(context.createClientChannelHandlers((Channel)channel));
            }
        });
        long duration = (System.nanoTime() - start) / 1000000L;
        LOGGER.info("Successful initialization (took {} ms).", (Object)duration);
    }

    private void initNioBootstrap() {
        NioEventLoopGroup nioGroup = new NioEventLoopGroup(this.config.getClientNumThreads(), NettyUtils.getNamedThreadFactory(CLIENT_THREAD_GROUP_NAME));
        ((Bootstrap)this.bootstrap.group((EventLoopGroup)nioGroup)).channel(NioSocketChannel.class);
    }

    private void initEpollBootstrap() {
        EpollEventLoopGroup epollGroup = new EpollEventLoopGroup(this.config.getClientNumThreads(), NettyUtils.getNamedThreadFactory(CLIENT_THREAD_GROUP_NAME));
        ((Bootstrap)this.bootstrap.group((EventLoopGroup)epollGroup)).channel(EpollSocketChannel.class);
    }

    public ChannelFuture connect(InetSocketAddress serverSocketAddress) {
        Preconditions.checkState((null != this.bootstrap ? 1 : 0) != 0, (Object)"Client has not been initialized yet.");
        try {
            return this.bootstrap.connect((SocketAddress)serverSocketAddress);
        }
        catch (ChannelException e) {
            String message = "Too many open files";
            if (e.getCause() instanceof SocketException && "Too many open files".equals(e.getCause().getMessage()) || e.getCause() instanceof ChannelException && e.getCause().getCause() instanceof SocketException && "Too many open files".equals(e.getCause().getCause().getMessage())) {
                throw new GeaflowRuntimeException("The operating system does not offer enough file handles to open the network connection. Please increase the number of available file handles.", e.getCause());
            }
            throw e;
        }
    }

    public ShuffleConfig getConfig() {
        return this.config;
    }

    public PooledByteBufAllocator getAllocator() {
        return this.allocator;
    }

    public void shutdown() {
        long start = System.nanoTime();
        if (this.bootstrap != null) {
            if (this.bootstrap.group() != null) {
                this.bootstrap.group().shutdownGracefully();
            }
            this.bootstrap = null;
        }
        long duration = (System.nanoTime() - start) / 1000000L;
        LOGGER.info("Successful shutdown (took {} ms).", (Object)duration);
    }
}

