/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.service;

import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class FailoverService {
    private static final Logger LOGGER = LoggerFactory.getLogger(FailoverService.class);
    private final RegistryClient registryClient;
    private final MasterConfig masterConfig;
    private final ProcessService processService;
    private final WorkflowExecuteThreadPool workflowExecuteThreadPool;

    public FailoverService(RegistryClient registryClient, MasterConfig masterConfig, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool) {
        this.registryClient = registryClient;
        this.masterConfig = masterConfig;
        this.processService = processService;
        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
    }

    public void checkMasterFailover() {
        List<String> hosts = this.getNeedFailoverMasterServers();
        if (CollectionUtils.isEmpty(hosts)) {
            return;
        }
        LOGGER.info("need failover hosts:{}", hosts);
        for (String host : hosts) {
            this.failoverMasterWithLock(host);
        }
    }

    public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
        switch (nodeType) {
            case MASTER: {
                this.failoverMasterWithLock(serverHost);
                break;
            }
            case WORKER: {
                this.failoverWorker(serverHost);
                break;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failoverMasterWithLock(String masterHost) {
        String failoverPath = this.getFailoverLockPath(NodeType.MASTER, masterHost);
        try {
            this.registryClient.getLock(failoverPath);
            this.failoverMaster(masterHost);
        }
        catch (Exception e) {
            LOGGER.error("{} server failover failed, host:{}", new Object[]{NodeType.MASTER, masterHost, e});
        }
        finally {
            this.registryClient.releaseLock(failoverPath);
        }
    }

    private void failoverMaster(String masterHost) {
        if (StringUtils.isEmpty((String)masterHost)) {
            return;
        }
        Date serverStartupTime = this.getServerStartupTime(NodeType.MASTER, masterHost);
        long startTime = System.currentTimeMillis();
        List needFailoverProcessInstanceList = this.processService.queryNeedFailoverProcessInstances(masterHost);
        LOGGER.info("start master[{}] failover, process list size:{}", (Object)masterHost, (Object)needFailoverProcessInstanceList.size());
        List servers = this.registryClient.getServerList(NodeType.WORKER);
        servers.addAll(this.registryClient.getServerList(NodeType.MASTER));
        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
            if ("NULL".equals(processInstance.getHost())) continue;
            List validTaskInstanceList = this.processService.findValidTaskListByProcessId(Integer.valueOf(processInstance.getId()));
            for (TaskInstance taskInstance : validTaskInstanceList) {
                LOGGER.info("failover task instance id: {}, process instance id: {}", (Object)taskInstance.getId(), (Object)taskInstance.getProcessInstanceId());
                this.failoverTaskInstance(processInstance, taskInstance, servers);
            }
            if (serverStartupTime != null && processInstance.getRestartTime() != null && processInstance.getRestartTime().after(serverStartupTime)) continue;
            LOGGER.info("failover process instance id: {}", (Object)processInstance.getId());
            processInstance.setHost("NULL");
            this.processService.processNeedFailoverProcessInstances(processInstance);
        }
        LOGGER.info("master[{}] failover end, useTime:{}ms", (Object)masterHost, (Object)(System.currentTimeMillis() - startTime));
    }

    private void failoverWorker(String workerHost) {
        if (StringUtils.isEmpty((String)workerHost)) {
            return;
        }
        long startTime = System.currentTimeMillis();
        List needFailoverTaskInstanceList = this.processService.queryNeedFailoverTaskInstances(workerHost);
        HashMap<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<Integer, ProcessInstance>();
        LOGGER.info("start worker[{}] failover, task list size:{}", (Object)workerHost, (Object)needFailoverTaskInstanceList.size());
        List workerServers = this.registryClient.getServerList(NodeType.WORKER);
        for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
            ProcessInstance processInstance = (ProcessInstance)processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
            if (processInstance == null) {
                processInstance = this.processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
                if (processInstance == null) {
                    LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", (Object)taskInstance.getProcessInstanceId(), (Object)taskInstance.getId());
                    continue;
                }
                processInstanceCacheMap.put(processInstance.getId(), processInstance);
            }
            if (!processInstance.getHost().equalsIgnoreCase(this.getLocalAddress())) continue;
            LOGGER.info("failover task instance id: {}, process instance id: {}", (Object)taskInstance.getId(), (Object)taskInstance.getProcessInstanceId());
            this.failoverTaskInstance(processInstance, taskInstance, workerServers);
        }
        LOGGER.info("end worker[{}] failover, useTime:{}ms", (Object)workerHost, (Object)(System.currentTimeMillis() - startTime));
    }

    private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
        if (processInstance == null) {
            LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", (Object)taskInstance.getProcessInstanceId(), (Object)taskInstance.getId());
            return;
        }
        if (!this.checkTaskInstanceNeedFailover(servers, taskInstance)) {
            return;
        }
        boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
        taskInstance.setProcessInstance(processInstance);
        if (!isMasterTask) {
            TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get().buildTaskInstanceRelatedInfo(taskInstance).buildProcessInstanceRelatedInfo(processInstance).create();
            if (this.masterConfig.isKillYarnJobWhenTaskFailover()) {
                ProcessUtils.killYarnJob((TaskExecutionContext)taskExecutionContext);
            }
        }
        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
        this.processService.saveTaskInstance(taskInstance);
        StateEvent stateEvent = new StateEvent();
        stateEvent.setTaskInstanceId(taskInstance.getId());
        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
        stateEvent.setProcessInstanceId(processInstance.getId());
        stateEvent.setExecutionStatus(taskInstance.getState());
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private List<String> getNeedFailoverMasterServers() {
        List hosts = this.processService.queryNeedFailoverProcessInstanceHost();
        Iterator iterator = hosts.iterator();
        while (iterator.hasNext()) {
            String host = (String)iterator.next();
            if (!this.registryClient.checkNodeExists(host, NodeType.MASTER) || host.equals(this.getLocalAddress())) continue;
            iterator.remove();
        }
        return hosts;
    }

    private boolean checkTaskInstanceNeedFailover(List<Server> servers, TaskInstance taskInstance) {
        boolean taskNeedFailover = true;
        if (taskInstance == null) {
            LOGGER.error("failover task instance error, taskInstance is null");
            return false;
        }
        if ("NULL".equals(taskInstance.getHost())) {
            return false;
        }
        if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
            return false;
        }
        if (taskInstance.getHost() == null) {
            return false;
        }
        if (this.checkTaskAfterServerStart(servers, taskInstance)) {
            taskNeedFailover = false;
        }
        return taskNeedFailover;
    }

    private boolean checkTaskAfterServerStart(List<Server> servers, TaskInstance taskInstance) {
        if (StringUtils.isEmpty((String)taskInstance.getHost())) {
            return false;
        }
        Date serverStartDate = this.getServerStartupTime(servers, taskInstance.getHost());
        if (serverStartDate != null) {
            if (taskInstance.getStartTime() == null) {
                return taskInstance.getSubmitTime().after(serverStartDate);
            }
            return taskInstance.getStartTime().after(serverStartDate);
        }
        return false;
    }

    private String getFailoverLockPath(NodeType nodeType, String host) {
        switch (nodeType) {
            case MASTER: {
                return "/lock/failover/masters/" + host;
            }
            case WORKER: {
                return "/lock/failover/workers/" + host;
            }
        }
        return "";
    }

    private Date getServerStartupTime(NodeType nodeType, String host) {
        if (StringUtils.isEmpty((String)host)) {
            return null;
        }
        List servers = this.registryClient.getServerList(nodeType);
        return this.getServerStartupTime(servers, host);
    }

    private Date getServerStartupTime(List<Server> servers, String host) {
        if (CollectionUtils.isEmpty(servers)) {
            return null;
        }
        Date serverStartupTime = null;
        for (Server server : servers) {
            if (!host.equals(server.getHost() + ":" + server.getPort())) continue;
            serverStartupTime = server.getCreateTime();
            break;
        }
        return serverStartupTime;
    }

    String getLocalAddress() {
        return NetUtils.getAddr((int)this.masterConfig.getListenPort());
    }
}

