/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.registry;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryDataListener;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MasterRegistryClient {
    private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class);
    @Autowired
    private FailoverService failoverService;
    @Autowired
    private RegistryClient registryClient;
    @Autowired
    private MasterConfig masterConfig;
    private ScheduledExecutorService heartBeatExecutor;
    private long startupTime;

    public void init() {
        this.startupTime = System.currentTimeMillis();
        this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("HeartBeatExecutor"));
    }

    public void start() {
        try {
            this.registry();
            this.registryClient.subscribe("/nodes", (SubscribeListener)new MasterRegistryDataListener());
        }
        catch (Exception e) {
            logger.error("master start up exception", (Throwable)e);
            throw new RuntimeException("master start up error", e);
        }
    }

    public void setRegistryStoppable(IStoppable stoppable) {
        this.registryClient.setStoppable(stoppable);
    }

    public void closeRegistry() {
        this.deregister();
    }

    public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) {
        logger.info("{} node deleted : {}", (Object)nodeType, (Object)path);
        if (StringUtils.isEmpty((String)path)) {
            logger.error("server down error: empty path: {}, nodeType:{}", (Object)path, (Object)nodeType);
            return;
        }
        String serverHost = this.registryClient.getHostByEventDataPath(path);
        if (StringUtils.isEmpty((String)serverHost)) {
            logger.error("server down error: unknown path: {}, nodeType:{}", (Object)path, (Object)nodeType);
            return;
        }
        try {
            if (!this.registryClient.exists(path)) {
                logger.info("path: {} not exists", (Object)path);
                this.registryClient.handleDeadServer(Collections.singleton(path), nodeType, "add");
            }
            if (failover) {
                this.failoverService.failoverServerWhenDown(serverHost, nodeType);
            }
        }
        catch (Exception e) {
            logger.error("{} server failover failed, host:{}", new Object[]{nodeType, serverHost, e});
        }
    }

    public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) {
        logger.info("{} node deleted : {}", (Object)nodeType, (Object)path);
        try {
            String serverHost = null;
            if (!StringUtils.isEmpty((String)path)) {
                serverHost = this.registryClient.getHostByEventDataPath(path);
                if (StringUtils.isEmpty((String)serverHost)) {
                    logger.error("server down error: unknown path: {}", (Object)path);
                    return;
                }
                if (!this.registryClient.exists(path)) {
                    logger.info("path: {} not exists", (Object)path);
                    this.registryClient.handleDeadServer(Collections.singleton(path), nodeType, "add");
                }
            }
            if (failover) {
                this.failoverService.failoverServerWhenDown(serverHost, nodeType);
            }
        }
        catch (Exception e) {
            logger.error("{} server failover failed", (Object)nodeType, (Object)e);
        }
    }

    public void registry() {
        String address = NetUtils.getAddr((int)this.masterConfig.getListenPort());
        String localNodePath = this.getMasterPath();
        int masterHeartbeatInterval = this.masterConfig.getHeartbeatInterval();
        HeartBeatTask heartBeatTask = new HeartBeatTask(this.startupTime, this.masterConfig.getMaxCpuLoadAvg(), this.masterConfig.getReservedMemory(), (Set)Sets.newHashSet((Object[])new String[]{this.getMasterPath()}), "master", this.registryClient);
        this.registryClient.remove(localNodePath);
        this.registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
        while (!this.registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
            ThreadUtils.sleep((long)1000L);
        }
        ThreadUtils.sleep((long)1000L);
        this.registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, "delete");
        this.registryClient.addConnectionStateListener(this::handleConnectionState);
        this.heartBeatExecutor.scheduleAtFixedRate((Runnable)heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
        logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", (Object)address, (Object)masterHeartbeatInterval);
    }

    public void handleConnectionState(ConnectionState state) {
        String localNodePath = this.getMasterPath();
        switch (state) {
            case CONNECTED: {
                logger.debug("registry connection state is {}", (Object)state);
                break;
            }
            case SUSPENDED: {
                logger.warn("registry connection state is {}, ready to retry connection", (Object)state);
                break;
            }
            case RECONNECTED: {
                logger.debug("registry connection state is {}, clean the node info", (Object)state);
                this.registryClient.persistEphemeral(localNodePath, "");
                break;
            }
            case DISCONNECTED: {
                logger.warn("registry connection state is {}, ready to stop myself", (Object)state);
                this.registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
                break;
            }
        }
    }

    public void deregister() {
        try {
            String address = this.getLocalAddress();
            String localNodePath = this.getMasterPath();
            this.registryClient.remove(localNodePath);
            logger.info("master node : {} unRegistry to register center.", (Object)address);
            this.heartBeatExecutor.shutdown();
            logger.info("heartbeat executor shutdown");
            this.registryClient.close();
        }
        catch (Exception e) {
            logger.error("remove registry path exception ", (Throwable)e);
        }
    }

    private String getMasterPath() {
        String address = this.getLocalAddress();
        return "/nodes/master/" + address;
    }

    private String getLocalAddress() {
        return NetUtils.getAddr((int)this.masterConfig.getListenPort());
    }
}

