/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.network.netty;

import com.antgroup.geaflow.common.shuffle.ShuffleAddress;
import com.antgroup.geaflow.common.utils.ThreadUtil;
import com.antgroup.geaflow.shuffle.config.ShuffleConfig;
import com.antgroup.geaflow.shuffle.network.ConnectionId;
import com.antgroup.geaflow.shuffle.network.IConnectionManager;
import com.antgroup.geaflow.shuffle.network.netty.NettyClient;
import com.antgroup.geaflow.shuffle.network.netty.NettyContext;
import com.antgroup.geaflow.shuffle.network.netty.NettyServer;
import com.antgroup.geaflow.shuffle.network.netty.SliceRequestClient;
import com.antgroup.geaflow.shuffle.network.netty.SliceRequestClientFactory;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionManager
implements IConnectionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
    private final ShuffleConfig nettyConfig;
    private final ShuffleAddress shuffleAddress;
    private final SliceRequestClientFactory clientFactory;
    private final ExecutorService executor;
    private NettyServer server;
    private NettyClient client;

    public ConnectionManager(ShuffleConfig config) {
        NettyContext context = new NettyContext(config);
        this.client = new NettyClient(config, context);
        this.server = new NettyServer(config, context);
        InetSocketAddress address = this.server.start();
        this.shuffleAddress = new ShuffleAddress(address.getAddress().getHostAddress(), address.getPort());
        this.clientFactory = new SliceRequestClientFactory(config, this.client);
        this.nettyConfig = config;
        this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), ThreadUtil.namedThreadFactory((boolean)true, (String)"connect"));
    }

    @Override
    public ShuffleAddress getShuffleAddress() {
        return this.shuffleAddress;
    }

    @Override
    public ShuffleConfig getShuffleConfig() {
        return this.nettyConfig;
    }

    @Override
    public PooledByteBufAllocator getServerBufAllocator() {
        return this.server.getPooledAllocator();
    }

    @Override
    public PooledByteBufAllocator getClientBufAllocator() {
        return this.client.getAllocator();
    }

    public NettyClient getClient() {
        return this.client;
    }

    public SliceRequestClient createSliceRequestClient(ConnectionId connectionId) throws IOException, InterruptedException {
        return this.clientFactory.createSliceRequestClient(connectionId);
    }

    public void closeOpenChannelConnections(ConnectionId connectionId) {
        this.clientFactory.closeOpenChannelConnections(connectionId);
    }

    @Override
    public void close() throws IOException {
        LOGGER.info("closing connection manager");
        if (this.server != null) {
            this.server.close();
            this.server = null;
        }
        if (this.client != null) {
            this.client.shutdown();
            this.client = null;
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    @Override
    public ExecutorService getExecutor() {
        return this.executor;
    }
}

