/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.rpc;

import com.antgroup.geaflow.cluster.protocol.IEvent;
import com.antgroup.geaflow.cluster.resourcemanager.ReleaseResourceRequest;
import com.antgroup.geaflow.cluster.resourcemanager.ReleaseResponse;
import com.antgroup.geaflow.cluster.resourcemanager.RequireResourceRequest;
import com.antgroup.geaflow.cluster.resourcemanager.RequireResponse;
import com.antgroup.geaflow.cluster.rpc.RpcEndpointRef;
import com.antgroup.geaflow.cluster.rpc.RpcEndpointRefFactory;
import com.antgroup.geaflow.cluster.rpc.impl.ContainerEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.DriverEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.MasterEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.PipelineMasterEndpointRef;
import com.antgroup.geaflow.cluster.rpc.impl.ResourceManagerEndpointRef;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.heartbeat.Heartbeat;
import com.antgroup.geaflow.common.utils.RetryCommand;
import com.antgroup.geaflow.ha.service.HAServiceFactory;
import com.antgroup.geaflow.ha.service.IHAService;
import com.antgroup.geaflow.ha.service.ResourceData;
import com.antgroup.geaflow.rpc.proto.Container;
import com.antgroup.geaflow.rpc.proto.Master;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcClient
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
    private static IHAService haService;
    private static RpcEndpointRefFactory refFactory;
    private static RpcClient INSTANCE;
    private static int RPC_RETRY_TIMES;
    private static int RPC_RETRY_INTERVAL_MS;
    private static int RPC_RETRY_EXTRA_MS;

    private RpcClient(Configuration configuration) {
        RPC_RETRY_INTERVAL_MS = configuration.getInteger(ExecutionConfigKeys.RPC_RETRY_INTERVAL_MS);
        int heartbeatCheckMs = configuration.getInteger(ExecutionConfigKeys.HEARTBEAT_TIMEOUT_MS);
        int minTimes = (int)Math.ceil((double)(heartbeatCheckMs + RPC_RETRY_EXTRA_MS) / (double)RPC_RETRY_INTERVAL_MS);
        int retryTimes = configuration.getInteger(ExecutionConfigKeys.RPC_RETRY_TIMES);
        RPC_RETRY_TIMES = Math.max(minTimes, retryTimes);
        refFactory = RpcEndpointRefFactory.getInstance(configuration);
        haService = HAServiceFactory.getService((Configuration)configuration);
    }

    public static synchronized RpcClient init(Configuration configuration) {
        if (INSTANCE == null) {
            INSTANCE = new RpcClient(configuration);
        }
        return INSTANCE;
    }

    public static synchronized RpcClient getInstance() {
        return INSTANCE;
    }

    public <T> void registerContainer(String masterId, T info, RpcEndpointRef.RpcCallback<Master.RegisterResponse> listener) {
        this.doRpcWithRetry(() -> this.connectMaster(masterId).registerContainer(info, listener), masterId, RpcEndpointRefFactory.EndpointType.MASTER);
    }

    public ListenableFuture<Empty> sendHeartBeat(String masterId, Heartbeat heartbeat) {
        return this.doRpcWithRetry(() -> this.connectMaster(masterId).sendHeartBeat(heartbeat), masterId, RpcEndpointRefFactory.EndpointType.MASTER);
    }

    public Empty sendException(String masterId, Integer containerId, Throwable throwable) {
        return this.doRpcWithRetry(() -> this.connectMaster(masterId).sendException(containerId, throwable.getMessage()), masterId, RpcEndpointRefFactory.EndpointType.MASTER);
    }

    public Future processContainer(String containerId, IEvent event) {
        return this.doRpcWithRetry(() -> this.connectContainer(containerId).process(event), containerId, RpcEndpointRefFactory.EndpointType.CONTAINER);
    }

    public void processContainer(String containerId, IEvent event, RpcEndpointRef.RpcCallback<Container.Response> callback) {
        this.doRpcWithRetry(() -> this.connectContainer(containerId).process(event, callback), containerId, RpcEndpointRefFactory.EndpointType.CONTAINER);
    }

    public void processPipeline(String driverId, IEvent event) {
        this.doRpcWithRetry(() -> this.connectPipelineManager(driverId).process(event), driverId, RpcEndpointRefFactory.EndpointType.PIPELINE_MANAGER);
    }

    public void processPipeline(String driverId, IEvent event, RpcEndpointRef.RpcCallback<Container.Response> callback) {
        this.doRpcWithRetry(() -> this.connectPipelineManager(driverId).process(event, callback), driverId, RpcEndpointRefFactory.EndpointType.PIPELINE_MANAGER);
    }

    public RequireResponse requireResource(String masterId, RequireResourceRequest request) {
        return this.doRpcWithRetry(() -> this.connectRM(masterId).requireResource(request), masterId, RpcEndpointRefFactory.EndpointType.RESOURCE_MANAGER);
    }

    public ReleaseResponse releaseResource(String masterId, ReleaseResourceRequest request) {
        return this.doRpcWithRetry(() -> this.connectRM(masterId).releaseResource(request), masterId, RpcEndpointRefFactory.EndpointType.RESOURCE_MANAGER);
    }

    public void closeMasterConnection(String masterId) {
        this.connectMaster(masterId).close();
    }

    public void closeDriverConnection(String driverId) {
        this.connectDriver(driverId).close();
    }

    public void closeContainerConnection(String containerId) {
        this.connectContainer(containerId).close();
    }

    private MasterEndpointRef connectMaster(String masterId) {
        ResourceData resourceData = this.getResourceData(masterId);
        return refFactory.connectMaster(resourceData.getHost(), resourceData.getRpcPort());
    }

    private ResourceManagerEndpointRef connectRM(String masterId) {
        ResourceData resourceData = this.getResourceData(masterId);
        return refFactory.connectResourceManager(resourceData.getHost(), resourceData.getRpcPort());
    }

    private DriverEndpointRef connectDriver(String driverId) {
        ResourceData resourceData = this.getResourceData(driverId);
        return refFactory.connectDriver(resourceData.getHost(), resourceData.getRpcPort());
    }

    private ContainerEndpointRef connectContainer(String containerId) {
        ResourceData resourceData = this.getResourceData(containerId);
        return refFactory.connectContainer(resourceData.getHost(), resourceData.getRpcPort());
    }

    private PipelineMasterEndpointRef connectPipelineManager(String id) {
        ResourceData resourceData = this.getResourceData(id);
        return refFactory.connectPipelineManager(resourceData.getHost(), resourceData.getRpcPort());
    }

    private <T> T doRpcWithRetry(Callable<T> function, String resourceId, RpcEndpointRefFactory.EndpointType endpointType) {
        return (T)RetryCommand.run(() -> {
            try {
                return function.call();
            }
            catch (Throwable t) {
                throw this.handleRpcException(resourceId, endpointType, t);
            }
        }, (int)RPC_RETRY_TIMES, (long)RPC_RETRY_INTERVAL_MS);
    }

    private void doRpcWithRetry(Runnable function, String resourceId, RpcEndpointRefFactory.EndpointType endpointType) {
        RetryCommand.run(() -> {
            try {
                function.run();
            }
            catch (Throwable t) {
                throw this.handleRpcException(resourceId, endpointType, t);
            }
            return null;
        }, (int)RPC_RETRY_TIMES, (long)RPC_RETRY_INTERVAL_MS);
    }

    private Exception handleRpcException(String resourceId, RpcEndpointRefFactory.EndpointType endpointType, Throwable t) {
        try {
            this.invalidateEndpointCache(resourceId, endpointType);
        }
        catch (Throwable e) {
            String errorMsg = String.format("get resource data failed caused by %s while invalidate endpoint cache: #%s", t.getMessage(), resourceId);
            return new GeaflowRuntimeException(errorMsg, e);
        }
        this.invalidateResourceData(resourceId);
        return new GeaflowRuntimeException(String.format("do rpc failed. %s", t.getMessage()), t);
    }

    protected void invalidateResourceData(String resourceId) {
        LOGGER.info("invalidate rpc resource cache of : #{}", (Object)resourceId);
        haService.invalidateResource(resourceId);
    }

    protected void invalidateEndpointCache(String resourceId, RpcEndpointRefFactory.EndpointType endpointType) {
        ResourceData resourceData = this.getResourceData(resourceId);
        refFactory.invalidateEndpointCache(resourceData.getHost(), resourceData.getRpcPort(), endpointType);
    }

    protected ResourceData getResourceData(String resourceId) {
        return haService.resolveResource(resourceId);
    }

    static {
        RPC_RETRY_EXTRA_MS = 30000;
    }
}

