/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.cluster;

import com.sun.management.OperatingSystemMXBean;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.zeppelin.cluster.ClusterManager;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.meta.ClusterMetaType;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterMonitor {
    private static Logger LOGGER = LoggerFactory.getLogger(ClusterMonitor.class);
    private static AtomicBoolean running = new AtomicBoolean(true);
    private ClusterManager clusterManager = null;
    private Queue<UsageUtil> monitorUsageQueues = new LinkedList<UsageUtil>();
    private final int USAGE_QUEUE_LIMIT = 100;
    private int heartbeatInterval = 3000;
    private int heartbeatTimeout = 9000;
    private ClusterMetaType clusterMetaType;
    private String metaKey;

    public ClusterMonitor(ClusterManager clusterManagerServer) {
        this.clusterManager = clusterManagerServer;
        ZeppelinConfiguration zconf = new ZeppelinConfiguration();
        this.heartbeatInterval = zconf.getClusterHeartbeatInterval();
        this.heartbeatTimeout = zconf.getClusterHeartbeatTimeout();
        if (this.heartbeatTimeout < this.heartbeatInterval) {
            LOGGER.error("Heartbeat timeout must be greater than heartbeat period.");
            this.heartbeatTimeout = this.heartbeatInterval * 3;
            LOGGER.info("Heartbeat timeout is modified to 3 times the heartbeat period.");
        }
        if (this.heartbeatTimeout < this.heartbeatInterval * 3) {
            LOGGER.warn("Heartbeat timeout recommended than 3 times the heartbeat period.");
        }
    }

    public void start(final ClusterMetaType clusterMetaType, String metaKey) {
        this.clusterMetaType = clusterMetaType;
        this.metaKey = metaKey;
        new Thread(new Runnable(){

            @Override
            public void run() {
                while (running.get()) {
                    switch (clusterMetaType) {
                        case SERVER_META: {
                            ClusterMonitor.this.sendMachineUsage();
                            ClusterMonitor.this.checkHealthy();
                            break;
                        }
                        case INTP_PROCESS_META: {
                            ClusterMonitor.this.sendHeartbeat();
                            break;
                        }
                        default: {
                            LOGGER.error("unknown cluster meta type:{}", (Object)clusterMetaType);
                        }
                    }
                    try {
                        Thread.sleep(ClusterMonitor.this.heartbeatInterval);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    public void shutdown() {
        running.set(false);
    }

    private void checkHealthy() {
        if (!this.clusterManager.isClusterLeader()) {
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("checkHealthy()");
        }
        LocalDateTime now = LocalDateTime.now();
        for (ClusterMetaType metaType : ClusterMetaType.values()) {
            HashMap<String, HashMap<String, Object>> clusterMeta = this.clusterManager.getClusterMeta(metaType, "");
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("clusterMeta : {}", clusterMeta);
            }
            for (Map.Entry entry : clusterMeta.entrySet()) {
                String key = (String)entry.getKey();
                Map meta = (Map)entry.getValue();
                String status = (String)meta.get(ClusterMeta.STATUS);
                if (status.equals(ClusterMeta.OFFLINE_STATUS)) continue;
                Object heartbeat = meta.get(ClusterMeta.LATEST_HEARTBEAT);
                if (heartbeat instanceof LocalDateTime) {
                    LocalDateTime dHeartbeat = (LocalDateTime)heartbeat;
                    Duration duration = Duration.between(dHeartbeat, now);
                    long timeInterval = duration.getSeconds() * 1000L;
                    if (timeInterval <= (long)this.heartbeatTimeout) continue;
                    HashMap<String, Object> mapValues = new HashMap<String, Object>();
                    mapValues.put(ClusterMeta.STATUS, ClusterMeta.OFFLINE_STATUS);
                    this.clusterManager.putClusterMeta(metaType, key, mapValues);
                    LOGGER.warn("offline heartbeat timeout[{}] meta[{}]", (Object)dHeartbeat, (Object)key);
                    continue;
                }
                LOGGER.error("wrong data type");
            }
        }
    }

    private void sendHeartbeat() {
        HashMap<String, Object> mapMonitorUtil = new HashMap<String, Object>();
        mapMonitorUtil.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
        mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
        this.clusterManager.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, this.metaKey, mapMonitorUtil);
    }

    private void sendMachineUsage() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("sendMachineUsage >>>");
        }
        while (this.monitorUsageQueues.size() > 100) {
            this.monitorUsageQueues.poll();
        }
        UsageUtil monitorUtil = this.getMachineUsage();
        this.monitorUsageQueues.add(monitorUtil);
        UsageUtil avgMonitorUtil = new UsageUtil();
        for (UsageUtil monitor : this.monitorUsageQueues) {
            UsageUtil usageUtil = avgMonitorUtil;
            usageUtil.memoryUsed = usageUtil.memoryUsed + monitor.memoryUsed;
            usageUtil = avgMonitorUtil;
            usageUtil.memoryCapacity = usageUtil.memoryCapacity + monitor.memoryCapacity;
            usageUtil = avgMonitorUtil;
            usageUtil.cpuUsed = usageUtil.cpuUsed + monitor.cpuUsed;
            usageUtil = avgMonitorUtil;
            usageUtil.cpuCapacity = usageUtil.cpuCapacity + monitor.cpuCapacity;
        }
        int queueSize = this.monitorUsageQueues.size();
        avgMonitorUtil.memoryUsed = avgMonitorUtil.memoryUsed / (long)queueSize;
        avgMonitorUtil.memoryCapacity = avgMonitorUtil.memoryCapacity / (long)queueSize;
        avgMonitorUtil.cpuUsed = avgMonitorUtil.cpuUsed / (long)queueSize;
        avgMonitorUtil.cpuCapacity = avgMonitorUtil.cpuCapacity / (long)queueSize;
        HashMap<String, Object> mapMonitorUtil = new HashMap<String, Object>();
        mapMonitorUtil.put(ClusterMeta.MEMORY_USED, avgMonitorUtil.memoryUsed);
        mapMonitorUtil.put(ClusterMeta.MEMORY_CAPACITY, avgMonitorUtil.memoryCapacity);
        mapMonitorUtil.put(ClusterMeta.CPU_USED, avgMonitorUtil.cpuUsed);
        mapMonitorUtil.put(ClusterMeta.CPU_CAPACITY, avgMonitorUtil.cpuCapacity);
        mapMonitorUtil.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
        mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
        String clusterName = this.clusterManager.getClusterNodeName();
        this.clusterManager.putClusterMeta(ClusterMetaType.SERVER_META, clusterName, mapMonitorUtil);
    }

    private UsageUtil getMachineUsage() {
        OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
        long freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
        long totalPhysicalMemorySize = operatingSystemMXBean.getTotalPhysicalMemorySize();
        double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
        int process = Runtime.getRuntime().availableProcessors();
        UsageUtil monitorUtil = new UsageUtil();
        monitorUtil.memoryUsed = totalPhysicalMemorySize - freePhysicalMemorySize;
        monitorUtil.memoryCapacity = totalPhysicalMemorySize;
        monitorUtil.cpuUsed = (long)((double)process * systemCpuLoad * 100.0);
        monitorUtil.cpuCapacity = process * 100;
        return monitorUtil;
    }

    private class UsageUtil {
        private long memoryUsed = 0L;
        private long memoryCapacity = 0L;
        private long cpuUsed = 0L;
        private long cpuCapacity = 0L;

        private UsageUtil() {
        }
    }
}

