/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.heartbeat;

import com.antgroup.geaflow.cluster.clustermanager.AbstractClusterManager;
import com.antgroup.geaflow.cluster.clustermanager.IClusterManager;
import com.antgroup.geaflow.cluster.container.ContainerInfo;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.common.heartbeat.Heartbeat;
import com.antgroup.geaflow.common.heartbeat.HeartbeatInfo;
import com.antgroup.geaflow.common.utils.ExecutorUtil;
import com.antgroup.geaflow.common.utils.ThreadUtil;
import com.antgroup.geaflow.stats.collector.StatsCollectorFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartbeatManager
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatManager.class);
    private final long heartbeatCheckMs;
    private final long heartbeatReportMs;
    private final long heartbeatReportExpiredMs;
    private final Map<Integer, Heartbeat> senderMap = new ConcurrentHashMap<Integer, Heartbeat>();
    private final IClusterManager clusterManager;
    private final ScheduledFuture<?> timeoutFuture;
    private final ScheduledFuture<?> reportFuture;
    private final ScheduledExecutorService checkTimeoutService;
    private final ScheduledExecutorService heartbeatReportService;

    public HeartbeatManager(Configuration config, IClusterManager clusterManager) {
        this.heartbeatCheckMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_TIMEOUT_MS);
        this.heartbeatReportMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_REPORT_INTERVAL_MS);
        int defaultReportExpiredMs = (int)((double)(this.heartbeatCheckMs + this.heartbeatReportMs) * 1.2);
        this.heartbeatReportExpiredMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_REPORT_EXPIRED_MS, defaultReportExpiredMs);
        this.checkTimeoutService = new ScheduledThreadPoolExecutor(1, ThreadUtil.namedThreadFactory((boolean)true, (String)"heartbeat-timeout-manager"));
        this.timeoutFuture = this.checkTimeoutService.scheduleAtFixedRate(this::checkHeartBeat, this.heartbeatCheckMs, this.heartbeatCheckMs, TimeUnit.MILLISECONDS);
        this.heartbeatReportService = new ScheduledThreadPoolExecutor(1, ThreadUtil.namedThreadFactory((boolean)true, (String)"heartbeat-report-manager"));
        this.reportFuture = this.heartbeatReportService.scheduleAtFixedRate(this::reportHeartbeat, this.heartbeatReportMs, this.heartbeatReportMs, TimeUnit.MILLISECONDS);
        this.clusterManager = clusterManager;
    }

    public void receivedHeartbeat(Heartbeat heartbeat) {
        this.senderMap.put(heartbeat.getContainerId(), heartbeat);
    }

    public Map<Integer, Heartbeat> getHeartBeatMap() {
        return this.senderMap;
    }

    public void checkHeartBeat() {
        long checkTime = System.currentTimeMillis();
        for (Integer componentId : this.getComponentIds()) {
            if (this.senderMap.containsKey(componentId) && checkTime <= this.senderMap.get(componentId).getTimestamp() + this.heartbeatCheckMs) continue;
            LOGGER.warn("Component#{} heartbeat missing.", (Object)componentId);
            this.doClusterFO(componentId);
        }
    }

    public void reportHeartbeat() {
        HeartbeatInfo heartbeatInfo = this.buildHeartbeatInfo();
        StatsCollectorFactory collectorFactory = StatsCollectorFactory.getInstance();
        if (collectorFactory != null) {
            collectorFactory.getHeartbeatCollector().reportHeartbeat(heartbeatInfo);
        }
    }

    protected HeartbeatInfo buildHeartbeatInfo() {
        Map<Integer, Heartbeat> heartbeatMap = this.getHeartBeatMap();
        Map<Integer, ContainerInfo> containerMap = ((AbstractClusterManager)this.clusterManager).getContainerInfos();
        Set<Integer> containerIndex = ((AbstractClusterManager)this.clusterManager).getContainerIds();
        int totalContainerNum = containerIndex.size();
        ArrayList<HeartbeatInfo.ContainerHeartbeatInfo> containerList = new ArrayList<HeartbeatInfo.ContainerHeartbeatInfo>();
        int activeContainers = 0;
        for (Map.Entry<Integer, ContainerInfo> entry : containerMap.entrySet()) {
            HeartbeatInfo.ContainerHeartbeatInfo containerHeartbeatInfo = new HeartbeatInfo.ContainerHeartbeatInfo();
            containerHeartbeatInfo.setId(entry.getKey());
            ContainerInfo info = entry.getValue();
            containerHeartbeatInfo.setName(info.getName());
            containerHeartbeatInfo.setHost(info.getHost());
            containerHeartbeatInfo.setPid(info.getPid());
            Heartbeat heartbeat = heartbeatMap.get(entry.getKey());
            if (heartbeat != null) {
                containerHeartbeatInfo.setLastTimestamp(Long.valueOf(heartbeat.getTimestamp()));
                containerHeartbeatInfo.setMetrics(heartbeat.getProcessMetrics());
                ++activeContainers;
            }
            containerList.add(containerHeartbeatInfo);
        }
        HeartbeatInfo heartbeatInfo = new HeartbeatInfo();
        heartbeatInfo.setExpiredTimeMs(this.heartbeatReportExpiredMs);
        heartbeatInfo.setTotalNum(totalContainerNum);
        heartbeatInfo.setActiveNum(activeContainers);
        heartbeatInfo.setContainers(containerList);
        return heartbeatInfo;
    }

    protected Set<Integer> getComponentIds() {
        HashSet<Integer> componentIds = new HashSet<Integer>();
        componentIds.addAll(((AbstractClusterManager)this.clusterManager).getContainerIds());
        componentIds.addAll(((AbstractClusterManager)this.clusterManager).getDriverIds());
        return componentIds;
    }

    protected void doClusterFO(int containerId) {
        try {
            ((AbstractClusterManager)this.clusterManager).clusterFailover(containerId);
        }
        catch (Throwable e) {
            LOGGER.error("Cluster failover failed.", e);
            throw e;
        }
    }

    public void close() {
        if (this.timeoutFuture != null) {
            this.timeoutFuture.cancel(true);
        }
        if (this.checkTimeoutService != null) {
            ExecutorUtil.shutdown((ExecutorService)this.checkTimeoutService);
        }
        if (this.reportFuture != null) {
            this.reportFuture.cancel(true);
        }
        if (this.heartbeatReportService != null) {
            ExecutorUtil.shutdown((ExecutorService)this.heartbeatReportService);
        }
    }
}

