/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.tesla.dag.services;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.tesla.dag.common.Tools;
import com.alibaba.tesla.dag.model.domain.dagnode.DagNodeType;
import com.alibaba.tesla.dag.notify.DagInstDispatch;
import com.alibaba.tesla.dag.notify.DagInstNodeTask;
import com.alibaba.tesla.dag.notify.IDagInstNotify;
import com.alibaba.tesla.dag.repository.dao.DagInstDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstEdgeDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstNodeDAO;
import com.alibaba.tesla.dag.repository.domain.DagInstDO;
import com.alibaba.tesla.dag.repository.domain.DagInstEdgeDO;
import com.alibaba.tesla.dag.repository.domain.DagInstNodeDO;
import com.alibaba.tesla.dag.repository.domain.DagNodeDO;
import com.alibaba.tesla.dag.schedule.status.DagInstEdgeStatus;
import com.alibaba.tesla.dag.schedule.status.DagInstNodeStatus;
import com.alibaba.tesla.dag.schedule.status.DagInstStatus;
import com.alibaba.tesla.dag.schedule.task.TaskStatus;
import com.alibaba.tesla.dag.services.AbstractActionNewService;
import com.alibaba.tesla.dag.services.DagInstNewService;
import com.alibaba.tesla.dag.util.DateUtil;
import com.alibaba.tesla.dag.util.MonitorUtil;
import com.google.common.base.Throwables;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.DigestUtils;

@Service
public class DagInstNodeNewService {
    private static final Logger log = LoggerFactory.getLogger(DagInstNodeNewService.class);
    @Autowired
    private DagInstNodeDAO dagInstNodeDAO;
    @Autowired
    private DagInstEdgeDAO dagInstEdgeDAO;
    @Autowired
    private DagInstDAO dagInstDAO;
    @Autowired
    private DagInstNewService dagInstNewService;
    @Autowired
    private List<AbstractActionNewService> actionNewServiceList;
    @Autowired
    private IDagInstNotify dagInstNotify;

    public void startDagInstNode(DagInstNodeTask dagInstNodeTask) {
        long beginTime = System.currentTimeMillis();
        DagInstNodeDO dagInstNodeDO = this.dagInstNodeDAO.getDagInstNode(dagInstNodeTask.getDagInstId(), dagInstNodeTask.getNodeId());
        if (Objects.isNull(dagInstNodeDO)) {
            log.info(">>>dagInstNodeNewService|startDagInstNode|Not Found|dagInstId={}, nodeId={}", (Object)dagInstNodeTask.getDagInstId(), (Object)dagInstNodeTask.getNodeId());
            return;
        }
        DagInstNodeStatus status = DagInstNodeStatus.valueOf(dagInstNodeDO.getStatus());
        log.info(">>>dagInstNodeNewService|startDagInstNode|dagInstId={}, nodeId={}, status={}", new Object[]{dagInstNodeTask.getDagInstId(), dagInstNodeTask.getNodeId(), status});
        try {
            switch (status) {
                case INIT: {
                    this.start(dagInstNodeDO);
                    break;
                }
                default: {
                    log.warn(">>>dagInstNodeNewService|startDagInstNode|Status is Wrong|dagInstId={}, nodeId={}, status={}", new Object[]{dagInstNodeTask.getDagInstId(), dagInstNodeTask.getNodeId(), status});
                    break;
                }
            }
        }
        catch (Exception e) {
            log.error(">>>dagInstNodeNewService|startDagInstNode|dagInstId={}, err={}", new Object[]{dagInstNodeTask.getDagInstId(), e.getMessage(), e});
            this.dagInstNodeDAO.updateStatusWithDetail(dagInstNodeDO.getId(), DagInstNodeStatus.EXCEPTION, Throwables.getStackTraceAsString((Throwable)e));
            DagInstDispatch dagInstDispatch = DagInstDispatch.builder().dagInstId(dagInstNodeDO.getDagInstId()).nodeId(dagInstNodeDO.getNodeId()).dagInstStatus(DagInstStatus.RUNNING).build();
            this.dagInstNotify.sendDagInstDispatch(dagInstDispatch);
        }
        MonitorUtil.nodeStartMonitor.addCost(System.currentTimeMillis() - beginTime);
    }

    public void start(DagInstNodeDO dagInstNodeDO) throws Exception {
        DagInstDO dagInstDO = this.dagInstDAO.getDagInstById(dagInstNodeDO.getDagInstId());
        DagInstNodeDO updateInstNode = DagInstNodeDO.builder().id(dagInstNodeDO.getId()).build();
        updateInstNode.setGmtStart(DateUtil.currentSeconds());
        switch (dagInstNodeDO.fetchNodeType()) {
            case DAG: {
                if (Objects.nonNull(dagInstNodeDO.getSubDagInstId()) && dagInstNodeDO.getSubDagInstId() > 0L) {
                    log.warn(">>>dagInstNodeNewService|start|\u91cd\u590d\u542f\u52a8\u5b50DAG|subDagInstId={}", (Object)dagInstNodeDO.getSubDagInstId());
                    return;
                }
                Long subDagInstId = this.dagInstNewService.submitSub(dagInstNodeDO, dagInstDO);
                log.info(">>>dagInstNodeNewService|SUCCESS|startDag|subDagInstId={}", (Object)subDagInstId);
                updateInstNode.setSubDagInstId(subDagInstId);
                break;
            }
            case NODE: {
                long beginTime = System.currentTimeMillis();
                if (StringUtils.isNotEmpty((CharSequence)dagInstNodeDO.getTaskId())) {
                    log.warn(">>>dagInstNodeNewService|start|\u91cd\u590d\u542f\u52a8\u4f5c\u4e1a|dagInstId={}, nodeId={}, taskId={}", new Object[]{dagInstNodeDO.getDagInstId(), dagInstNodeDO.getNodeId(), dagInstNodeDO.getTaskId()});
                    return;
                }
                DagNodeDO dagNode = dagInstNodeDO.fetchDagNode();
                DagNodeType dagNodeType = dagNode.fetchNodeType();
                AbstractActionNewService abstractActionNewService = this.actionNewServiceList.stream().filter(actionNewService -> actionNewService.registerNodeType() == dagNodeType).findFirst().orElse(null);
                if (Objects.isNull(abstractActionNewService)) {
                    throw new Exception("no such type: " + (Object)((Object)dagNodeType));
                }
                Long taskId = abstractActionNewService.start(dagInstDO, dagInstNodeDO);
                log.info(">>>dagInstNodeNewService|SUCCESS|startJob|dagInstId={}, nodeId={}, taskId={}", new Object[]{dagInstNodeDO.getDagInstId(), dagInstNodeDO.getNodeId(), taskId});
                MonitorUtil.taskMonitor.addCost(System.currentTimeMillis() - beginTime);
                updateInstNode.setTaskId(Long.toString(taskId));
                break;
            }
        }
        updateInstNode.setStatus(DagInstNodeStatus.RUNNING.toString());
        this.dagInstNodeDAO.update(updateInstNode);
    }

    public void inspectDagInstNode(DagInstNodeTask dagInstNodeTask) throws Exception {
        DagInstNodeDO dagInstNodeDO = this.dagInstNodeDAO.getDagInstNode(dagInstNodeTask.getDagInstId(), dagInstNodeTask.getNodeId());
        if (Objects.isNull(dagInstNodeDO)) {
            log.info(">>>dagInstNodeNewService|inspectDagInstNode|Not Found|dagInstNodeTask={}", (Object)dagInstNodeTask);
            return;
        }
        switch (dagInstNodeDO.fetchNodeType()) {
            case DAG: {
                Long subDagInstId = dagInstNodeDO.getSubDagInstId();
                DagInstDO subDagInst = this.dagInstDAO.getDagInstById(subDagInstId);
                DagInstStatus subDagInstStatus = subDagInst.fetchStatus();
                if (subDagInstStatus == DagInstStatus.EXCEPTION) {
                    this.dagInstNodeDAO.updateStatus(dagInstNodeDO.getId(), DagInstNodeStatus.EXCEPTION);
                } else if (subDagInstStatus == DagInstStatus.SUCCESS) {
                    DagInstNodeDO updateDagNodeInst = DagInstNodeDO.builder().id(dagInstNodeDO.getId()).build();
                    updateDagNodeInst.setGlobalResult(subDagInst.getGlobalResult());
                    updateDagNodeInst.setGlobalParams(subDagInst.getGlobalParams());
                    if (StringUtils.isNotEmpty((CharSequence)subDagInst.getGlobalParams())) {
                        log.info(">>>dagInstNodeNewService|setGlobalParams|copy globalParams from sub to parent|dagInstId={}, subDagInst={}, md5={}", new Object[]{dagInstNodeDO.getDagInstId(), subDagInstId, DigestUtils.md5DigestAsHex((byte[])subDagInst.getGlobalParams().getBytes())});
                    }
                    updateDagNodeInst.setGlobalObject(subDagInst.getGlobalObject());
                    updateDagNodeInst.setStatus(DagInstNodeStatus.MERGE.toString());
                    updateDagNodeInst.setLockId(DateUtil.currentTimeMillis());
                    this.dagInstNodeDAO.update(updateDagNodeInst);
                    log.info(">>>dagInstNodeNewService|inspectDagInstNode|MERGE|dagInstId={}", (Object)dagInstNodeDO.getDagInstId());
                    this.calcEdge(dagInstNodeDO);
                    this.dagInstNewService.freshGlobalData(dagInstNodeDO.getDagInstId());
                    updateDagNodeInst = DagInstNodeDO.builder().id(dagInstNodeDO.getId()).build();
                    updateDagNodeInst.setStatus(DagInstNodeStatus.SUCCESS.toString());
                    this.dagInstNodeDAO.update(updateDagNodeInst);
                    log.info(">>>dagInstNodeNewService|inspectDagInstNode|SUCCESS|dagInstId={}", (Object)dagInstNodeDO.getDagInstId());
                } else {
                    log.info(">>>dagInstNodeNewService|inspectDagInstNode|\u515c\u5e95\u6761\u4ef6|subDagInstId={}, subDagInstStatus={}", (Object)subDagInstId, (Object)subDagInstStatus);
                }
                DagInstDispatch dagInstDispatch = DagInstDispatch.builder().dagInstId(dagInstNodeDO.getDagInstId()).nodeId(dagInstNodeDO.getNodeId()).dagInstStatus(DagInstStatus.RUNNING).build();
                this.dagInstNotify.sendDagInstDispatch(dagInstDispatch);
                break;
            }
            case NODE: {
                String taskId;
                long beginTime = System.currentTimeMillis();
                DagInstNodeStatus nodeStatus = dagInstNodeDO.fetchNodeStatus();
                if (nodeStatus != DagInstNodeStatus.RUNNING || !StringUtils.isNotEmpty((CharSequence)(taskId = dagInstNodeDO.getTaskId()))) break;
                DagNodeDO dagNode = dagInstNodeDO.fetchDagNode();
                DagNodeType dagNodeType = dagNode.fetchNodeType();
                AbstractActionNewService abstractActionNewService = this.actionNewServiceList.stream().filter(actionNewService -> actionNewService.registerNodeType() == dagNodeType).findFirst().orElse(null);
                Long taskIdLong = Long.parseLong(taskId);
                TaskStatus taskStatus = abstractActionNewService.status(taskIdLong);
                MonitorUtil.statusMonitor.addCost(System.currentTimeMillis() - beginTime);
                this.jobCallBack(dagInstNodeDO.getDagInstId(), dagInstNodeDO.getNodeId(), taskIdLong, taskStatus, "");
                break;
            }
        }
    }

    public void jobCallBack(Long dagInstId, String nodeId, Long taskId, TaskStatus taskStatus, String detail) throws Exception {
        log.info(">>>dagInstNodeNewService|jobCallBack|enter|dagInstId={}, nodeId={}, taskId={}, taskStatus={}, detail={}", new Object[]{dagInstId, nodeId, taskId, taskStatus, detail});
        DagInstNodeDO dagInstNodeDO = this.dagInstNodeDAO.getDagInstNode(dagInstId, nodeId);
        DagInstDO dagInstDO = this.dagInstDAO.getDagInstById(dagInstId);
        long beginTime = System.currentTimeMillis();
        if (taskStatus == TaskStatus.SUCCESS) {
            DagNodeDO dagNode = dagInstNodeDO.fetchDagNode();
            DagNodeType dagNodeType = dagNode.fetchNodeType();
            AbstractActionNewService abstractActionNewService = this.actionNewServiceList.stream().filter(actionNewService -> actionNewService.registerNodeType() == dagNodeType).findFirst().orElse(null);
            if (Objects.isNull(abstractActionNewService)) {
                throw new Exception("no such type: " + (Object)((Object)dagNodeType));
            }
            DagInstNodeDO updateDagNodeInst = DagInstNodeDO.builder().id(dagInstNodeDO.getId()).build();
            JSONObject outJson = null;
            try {
                outJson = abstractActionNewService.stdout(taskId);
            }
            catch (Exception e) {
                this.dagInstNodeDAO.updateStatusWithDetail(dagInstNodeDO.getId(), DagInstNodeStatus.EXCEPTION, e.getMessage());
            }
            if (Objects.nonNull(outJson)) {
                MonitorUtil.stdOutMonitor.addCost(System.currentTimeMillis() - beginTime);
                updateDagNodeInst.setGlobalObject(outJson.getString("_global_object"));
                updateDagNodeInst.setGlobalParams(outJson.getString("_global_params"));
                if (StringUtils.isNotEmpty((CharSequence)updateDagNodeInst.getGlobalParams())) {
                    log.info(">>>dagInstNodeNewService|setGlobalParams|set globalParams to node|dagInstId={}, nodeId={}, md5={}", new Object[]{dagInstId, nodeId, DigestUtils.md5DigestAsHex((byte[])updateDagNodeInst.getGlobalParams().getBytes())});
                }
                updateDagNodeInst.setGlobalResult(AbstractActionNewService.getDataResultAndOutput(outJson).toJSONString());
                updateDagNodeInst.setStatus(DagInstNodeStatus.MERGE.toString());
                updateDagNodeInst.setLockId(DateUtil.currentTimeMillis());
                this.dagInstNodeDAO.update(updateDagNodeInst);
                log.info(">>>dagInstNodeNewService|jobCallBack|MERGE|dagInstId={}", (Object)dagInstNodeDO.getDagInstId());
                this.dagInstNewService.freshGlobalData(dagInstId);
                this.calcEdge(dagInstNodeDO);
                updateDagNodeInst = DagInstNodeDO.builder().id(dagInstNodeDO.getId()).build();
                updateDagNodeInst.setStatus(DagInstNodeStatus.SUCCESS.toString());
                this.dagInstNodeDAO.update(updateDagNodeInst);
                log.info(">>>dagInstNodeNewService|jobCallBack|SUCCESS|dagInstId={}", (Object)dagInstNodeDO.getDagInstId());
            }
        } else if (taskStatus == TaskStatus.EXCEPTION) {
            this.dagInstNodeDAO.updateStatusWithDetail(dagInstNodeDO.getId(), DagInstNodeStatus.EXCEPTION, StringUtils.isEmpty((CharSequence)detail) ? "exception in running task" : detail);
        }
        DagInstDispatch dagInstDispatch = DagInstDispatch.builder().dagInstId(dagInstNodeDO.getDagInstId()).nodeId(dagInstNodeDO.getNodeId()).dagInstStatus(dagInstDO.fetchStatus()).build();
        this.dagInstNotify.sendDagInstDispatch(dagInstDispatch);
    }

    public void calcEdge(DagInstNodeDO dagInstNodeDO) {
        List edgeList = this.dagInstEdgeDAO.getList(dagInstNodeDO.getDagInstId()).stream().filter(dagInstEdgeDO -> StringUtils.equals((CharSequence)dagInstEdgeDO.getSource(), (CharSequence)dagInstNodeDO.getNodeId())).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(edgeList)) {
            DagInstDO dagInstDO = null;
            for (DagInstEdgeDO edge : edgeList) {
                try {
                    String expression = edge.fetchExpressionString();
                    Boolean isPass = true;
                    if (StringUtils.isNotEmpty((CharSequence)expression)) {
                        if (Objects.isNull(dagInstDO)) {
                            dagInstDO = this.dagInstDAO.getDagInstById(dagInstNodeDO.getDagInstId());
                        }
                        if (Objects.isNull(isPass = Tools.execExpression(edge.fetchKey(), (Map<String, Object>)dagInstDO.fetchExpressionParamsJson(), expression, Boolean.class))) {
                            isPass = false;
                        }
                    }
                    edge.setIsPass(isPass != false ? 1 : 0);
                    edge.setStatus(DagInstEdgeStatus.SUCCESS.toString());
                }
                catch (Exception e) {
                    edge.setStatus(DagInstEdgeStatus.EXCEPTION.toString());
                    edge.setException(Throwables.getStackTraceAsString((Throwable)e));
                }
                edge.setGmtModified(DateUtil.currentSeconds());
                this.dagInstEdgeDAO.update(edge);
            }
        }
    }
}

