/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.rpc;

import com.antgroup.geaflow.cluster.rpc.RpcEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.ContainerEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.DriverEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.MasterEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.PipelineMasterEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.ResourceManagerEndpointRef;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.common.utils.ThreadUtil;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RpcEndpointRefFactory
implements Serializable {
    private final Map<EndpointRefID, RpcEndpointRef> endpointRefMap = new ConcurrentHashMap<EndpointRefID, RpcEndpointRef>();
    private final ExecutorService executorService;
    private static RpcEndpointRefFactory INSTANCE;

    private RpcEndpointRefFactory(Configuration config) {
        int threads = config.getInteger(ExecutionConfigKeys.RPC_ASYNC_THREADS);
        this.executorService = new ThreadPoolExecutor(threads, threads, Long.MAX_VALUE, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), ThreadUtil.namedThreadFactory((boolean)true, (String)"rpc-executor"));
    }

    public static synchronized RpcEndpointRefFactory getInstance(Configuration config) {
        if (INSTANCE == null) {
            INSTANCE = new RpcEndpointRefFactory(config);
        }
        return INSTANCE;
    }

    public static synchronized RpcEndpointRefFactory getInstance() {
        return INSTANCE;
    }

    public MasterEndpointRef connectMaster(String host, int port) {
        EndpointRefID refID = new EndpointRefID(host, port, EndpointType.MASTER);
        try {
            return (MasterEndpointRef)this.endpointRefMap.computeIfAbsent(refID, key -> new MasterEndpointRef(host, port, this.executorService));
        }
        catch (Throwable t) {
            this.endpointRefMap.remove(refID);
            throw new RuntimeException("connect master error, host " + host + " port " + port, t);
        }
    }

    public ResourceManagerEndpointRef connectResourceManager(String host, int port) {
        EndpointRefID refID = new EndpointRefID(host, port, EndpointType.RESOURCE_MANAGER);
        try {
            return (ResourceManagerEndpointRef)this.endpointRefMap.computeIfAbsent(refID, key -> new ResourceManagerEndpointRef(host, port, this.executorService));
        }
        catch (Throwable t) {
            this.endpointRefMap.remove(refID);
            throw new RuntimeException("connect rm error, host " + host + " port " + port, t);
        }
    }

    public DriverEndpointRef connectDriver(String host, int port) {
        EndpointRefID refID = new EndpointRefID(host, port, EndpointType.DRIVER);
        try {
            return (DriverEndpointRef)this.endpointRefMap.computeIfAbsent(refID, key -> new DriverEndpointRef(host, port, this.executorService));
        }
        catch (Throwable t) {
            this.endpointRefMap.remove(refID);
            throw new RuntimeException("connect driver error, host " + host + " port " + port, t);
        }
    }

    public PipelineMasterEndpointRef connectPipelineManager(String host, int port) {
        EndpointRefID refID = new EndpointRefID(host, port, EndpointType.PIPELINE_MANAGER);
        try {
            return (PipelineMasterEndpointRef)this.endpointRefMap.computeIfAbsent(refID, key -> new PipelineMasterEndpointRef(host, port, this.executorService));
        }
        catch (Throwable t) {
            this.endpointRefMap.remove(refID);
            throw new RuntimeException("connect pipeline master error, host " + host + " port " + port, t);
        }
    }

    public ContainerEndpointRef connectContainer(String host, int port) {
        EndpointRefID refID = new EndpointRefID(host, port, EndpointType.CONTAINER);
        try {
            return (ContainerEndpointRef)this.endpointRefMap.computeIfAbsent(refID, key -> new ContainerEndpointRef(host, port, this.executorService));
        }
        catch (Throwable t) {
            this.endpointRefMap.remove(refID);
            throw new RuntimeException("connect container error, host " + host + " port " + port, t);
        }
    }

    public void invalidateEndpointCache(String host, int port, EndpointType endpointType) {
        EndpointRefID refID = new EndpointRefID(host, port, endpointType);
        this.endpointRefMap.remove(refID);
    }

    public static class EndpointRefID {
        private String host;
        private int port;
        private EndpointType endpointType;

        public EndpointRefID(String host, int port, EndpointType endpointType) {
            this.host = host;
            this.port = port;
            this.endpointType = endpointType;
        }

        public String getHost() {
            return this.host;
        }

        public void setHost(String host) {
            this.host = host;
        }

        public int getPort() {
            return this.port;
        }

        public void setPort(int port) {
            this.port = port;
        }

        public EndpointType getEndpointType() {
            return this.endpointType;
        }

        public void setEndpointType(EndpointType endpointType) {
            this.endpointType = endpointType;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            EndpointRefID that = (EndpointRefID)o;
            return this.port == that.port && this.host.equals(that.host) && this.endpointType == that.endpointType;
        }

        public int hashCode() {
            return Objects.hash(new Object[]{this.host, this.port, this.endpointType});
        }

        public String toString() {
            return "EndpointRefID{host='" + this.host + '\'' + ", port=" + this.port + ", endpointType=" + (Object)((Object)this.endpointType) + '}';
        }
    }

    static enum EndpointType {
        MASTER,
        RESOURCE_MANAGER,
        DRIVER,
        PIPELINE_MANAGER,
        CONTAINER;

    }
}

