/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.network.netty;

import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.shuffle.config.ShuffleConfig;
import com.antgroup.geaflow.shuffle.network.ConnectionId;
import com.antgroup.geaflow.shuffle.network.netty.NettyClient;
import com.antgroup.geaflow.shuffle.network.netty.SliceRequestClient;
import com.antgroup.geaflow.shuffle.network.netty.SliceRequestClientHandler;
import com.antgroup.geaflow.shuffle.util.TransportException;
import io.netty.channel.Channel;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SliceRequestClientFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(SliceRequestClientFactory.class);
    private final int retryNumber;
    private final NettyClient nettyClient;
    private final ConcurrentMap<ConnectionId, CompletableFuture<SliceRequestClient>> clients = new ConcurrentHashMap<ConnectionId, CompletableFuture<SliceRequestClient>>();

    public SliceRequestClientFactory(ShuffleConfig nettyConfig, NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.retryNumber = nettyConfig.getConnectMaxRetries();
    }

    public SliceRequestClient createSliceRequestClient(ConnectionId connectionId) throws InterruptedException {
        while (true) {
            SliceRequestClient client;
            CompletableFuture<SliceRequestClient> newClientFuture;
            CompletableFuture clientFuture;
            if ((clientFuture = this.clients.putIfAbsent(connectionId, newClientFuture = new CompletableFuture<SliceRequestClient>())) == null) {
                try {
                    client = this.connectWithRetries(connectionId);
                }
                catch (Throwable e) {
                    newClientFuture.completeExceptionally(new IOException("Could not create client.", e));
                    this.clients.remove(connectionId, newClientFuture);
                    throw e;
                }
                newClientFuture.complete(client);
            } else {
                try {
                    client = (SliceRequestClient)clientFuture.get();
                }
                catch (ExecutionException e) {
                    throw new GeaflowRuntimeException("connect failed", (Throwable)e);
                }
            }
            if (client.incrementReferenceCounter()) {
                return client;
            }
            this.destroyRequestClient(connectionId, client);
        }
    }

    private SliceRequestClient connectWithRetries(ConnectionId connectionId) {
        int tried = 0;
        long startTime = System.nanoTime();
        while (true) {
            try {
                SliceRequestClient client = this.connect(connectionId);
                LOGGER.info("Successfully created connection to {} after {} ms", (Object)connectionId, (Object)((System.nanoTime() - startTime) / 1000000L));
                return client;
            }
            catch (TransportException e) {
                LOGGER.error("failed to connect to {}, retry #{}", new Object[]{connectionId, ++tried, e});
                if (tried <= this.retryNumber) continue;
                throw new GeaflowRuntimeException(String.format("Failed to connect to %s", connectionId.getAddress()), (Throwable)e);
            }
            break;
        }
    }

    private SliceRequestClient connect(ConnectionId connectionId) throws TransportException {
        try {
            Channel channel = this.nettyClient.connect(connectionId.getAddress()).await().channel();
            SliceRequestClientHandler clientHandler = (SliceRequestClientHandler)channel.pipeline().get(SliceRequestClientHandler.class);
            return new SliceRequestClient(channel, clientHandler, connectionId, this);
        }
        catch (Exception e) {
            throw new TransportException("Connecting to remote server '" + connectionId.getAddress() + "' has failed. This might indicate that the remote server has been lost.", connectionId.getAddress(), e);
        }
    }

    public void closeOpenChannelConnections(ConnectionId connectionId) {
        CompletableFuture entry = (CompletableFuture)this.clients.get(connectionId);
        if (entry != null && !entry.isDone()) {
            entry.thenAccept(client -> {
                if (client.disposeIfNotUsed()) {
                    this.clients.remove(connectionId, entry);
                }
            });
        }
    }

    public void destroyRequestClient(ConnectionId connectionId, SliceRequestClient client) {
        CompletableFuture future = (CompletableFuture)this.clients.get(connectionId);
        if (future != null && future.isDone()) {
            future.thenAccept(futureClient -> {
                if (client.equals(futureClient)) {
                    this.clients.remove(connectionId, future);
                }
            });
        }
    }
}

