/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.tesla.dag.dispatch;

import com.alibaba.tesla.dag.algorithm.DAG;
import com.alibaba.tesla.dag.dispatch.IDagInstDispatcher;
import com.alibaba.tesla.dag.notify.DagInstDispatch;
import com.alibaba.tesla.dag.notify.DagInstNodeTask;
import com.alibaba.tesla.dag.notify.IDagInstNodeTaskNotify;
import com.alibaba.tesla.dag.notify.IDagInstNotify;
import com.alibaba.tesla.dag.notify.NodeTaskType;
import com.alibaba.tesla.dag.repository.dao.DagInstEdgeDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstNodeDAO;
import com.alibaba.tesla.dag.repository.domain.DagInstDO;
import com.alibaba.tesla.dag.repository.domain.DagInstEdgeDO;
import com.alibaba.tesla.dag.repository.domain.DagInstNodeDO;
import com.alibaba.tesla.dag.schedule.status.DagInstEdgeStatus;
import com.alibaba.tesla.dag.schedule.status.DagInstNodeStatus;
import com.alibaba.tesla.dag.schedule.status.DagInstStatus;
import com.alibaba.tesla.dag.services.DagInstNewService;
import com.alibaba.tesla.dag.services.DagInstNodeNewService;
import com.alibaba.tesla.dag.util.DagUtil;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RunningDagInstDispatcher
implements IDagInstDispatcher {
    private static final Logger log = LoggerFactory.getLogger(RunningDagInstDispatcher.class);
    @Autowired
    private IDagInstNotify dagInstNotify;
    @Autowired
    private IDagInstNodeTaskNotify dagInstNodeTaskNotify;
    @Autowired
    private DagInstNodeDAO dagInstNodeDAO;
    @Autowired
    private DagInstEdgeDAO dagInstEdgeDAO;
    @Autowired
    private DagInstNewService dagInstNewService;
    @Autowired
    private DagInstNodeNewService dagInstNodeNewService;

    private Boolean conditionIsAllSuccess(List<DagInstNodeDO> sourceNodeList, String target, Map<String, DagInstEdgeDO> key2EdgeMap) {
        for (DagInstNodeDO sourceNode : sourceNodeList) {
            if (sourceNode.fetchNodeStatus() != DagInstNodeStatus.SUCCESS) continue;
            String key = sourceNode.getNodeId() + "-" + target;
            DagInstEdgeDO edge = key2EdgeMap.get(key);
            if (Objects.nonNull(edge) && edge.fetchEdgeStatus() == DagInstEdgeStatus.INIT) {
                this.dagInstNodeNewService.calcEdge(sourceNode);
                return null;
            }
            if (!Objects.isNull(edge) && edge.fetchEdgeStatus() == DagInstEdgeStatus.SUCCESS && !Objects.equals(edge.getIsPass(), 0)) continue;
            return false;
        }
        return true;
    }

    @Override
    public DagInstStatus registerType() {
        return DagInstStatus.RUNNING;
    }

    @Override
    public void dispatch(DagInstDO dagInstDO) {
        Long dagInstId = dagInstDO.getId();
        log.info(">>>[Running]|dispatch|dagInstId={}", (Object)dagInstId);
        List<DagInstNodeDO> nodeList = this.dagInstNodeDAO.getSimpleList(dagInstId);
        List<DagInstEdgeDO> edgeList = this.dagInstEdgeDAO.getSimpleList(dagInstId);
        Map<String, DagInstNodeDO> id2NodeMap = nodeList.stream().collect(Collectors.toMap(DagInstNodeDO::getNodeId, o -> o));
        Map<String, DagInstEdgeDO> key2EdgeMap = edgeList.stream().collect(Collectors.toMap(DagInstEdgeDO::fetchKey, o -> o));
        DAG dag = DagUtil.calcDAG(nodeList, edgeList);
        for (DagInstNodeDO node : nodeList) {
            this.doChildrenOnExceptionOrStopped(node, dag, id2NodeMap);
        }
        for (DagInstNodeDO node : nodeList) {
            DagInstNodeStatus nodeStatus = node.fetchNodeStatus();
            if (!Objects.equals((Object)nodeStatus, (Object)DagInstNodeStatus.INIT)) continue;
            Set parent = dag.getParent(node.getNodeId());
            if (CollectionUtils.isEmpty((Collection)parent)) {
                log.info(">>>runningDagInstDispatcher|dispatch|start|Header Node={}", (Object)node.toSimpleString());
                this.dispatch(node);
                continue;
            }
            List<DagInstNodeDO> parentNodeList = nodeList.stream().filter(dagInstNodeDO -> parent.contains(dagInstNodeDO.getNodeId())).collect(Collectors.toList());
            List<DagInstNodeStatus> parentStatusList = parentNodeList.stream().map(dagInstNodeDO -> dagInstNodeDO.fetchNodeStatus()).collect(Collectors.toList());
            if (this.isOnly(parentStatusList, Arrays.asList(DagInstNodeStatus.SKIP))) {
                log.info(">>>runningDagInstDispatcher|dispatch|skip|node={}", (Object)node.toSimpleString());
                node.setStatus(DagInstNodeStatus.SKIP.toString());
                this.dagInstNodeDAO.updateStatus(node.getId(), DagInstNodeStatus.SKIP);
                continue;
            }
            if (this.isOnly(parentStatusList, Arrays.asList(DagInstNodeStatus.SKIP, DagInstNodeStatus.SUCCESS))) {
                log.info(">>>runningDagInstDispatcher|dispatch|success|node={}", (Object)node.toSimpleString());
                Boolean condition = this.conditionIsAllSuccess(parentNodeList, node.getNodeId(), key2EdgeMap);
                if (condition == Boolean.TRUE) {
                    this.dispatch(node);
                    continue;
                }
                if (condition != Boolean.FALSE) continue;
                log.info(">>>runningDagInstDispatcher|dispatch|edge condition is false|node={}", (Object)node.toSimpleString());
                node.setStatus(DagInstNodeStatus.SKIP.toString());
                this.dagInstNodeDAO.updateStatus(node.getId(), DagInstNodeStatus.SKIP);
                continue;
            }
            log.info(">>>runningDagInstDispatcher|dispatch|ignore|node={}, parentStatusList={}", (Object)node.toSimpleString(), parentStatusList);
        }
        List allNodeStatusList = nodeList.stream().map(DagInstNodeDO::fetchNodeStatus).collect(Collectors.toList());
        if (allNodeStatusList.contains((Object)DagInstNodeStatus.INIT) || allNodeStatusList.contains((Object)DagInstNodeStatus.RUNNING)) {
            return;
        }
        if (allNodeStatusList.contains((Object)DagInstNodeStatus.EXCEPTION) || allNodeStatusList.contains((Object)DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION)) {
            this.dagInstNewService.freshInstStatus(dagInstDO, this.registerType(), DagInstStatus.EXCEPTION);
            return;
        }
        DagInstNodeDO postNode = this.dagInstNodeDAO.getDagInstNode(dagInstId, "__post_node__");
        if (Objects.isNull(postNode)) {
            this.dagInstNewService.freshInstStatus(dagInstDO, this.registerType(), DagInstStatus.SUCCESS);
        } else {
            this.dagInstNewService.freshInstStatus(dagInstDO, this.registerType(), DagInstStatus.POST_RUNNING);
            DagInstDispatch dagInstDispatch = DagInstDispatch.builder().dagInstId(dagInstId).nodeId(postNode.getNodeId()).dagInstStatus(DagInstStatus.POST_RUNNING).build();
            this.dagInstNotify.sendDagInstDispatch(dagInstDispatch);
        }
    }

    private void doChildrenOnExceptionOrStopped(DagInstNodeDO node, DAG dag, Map<String, DagInstNodeDO> id2NodeMap) {
        DagInstNodeStatus nodeStatus = node.fetchNodeStatus();
        if (!nodeStatus.isException() && !nodeStatus.isStopped()) {
            return;
        }
        Set children = dag.getChildren(node.getNodeId());
        if (CollectionUtils.isEmpty((Collection)children)) {
            return;
        }
        for (Object child : children) {
            DagInstNodeDO childNode = id2NodeMap.get(child);
            if (Objects.isNull(childNode)) {
                return;
            }
            DagInstNodeStatus childNodeStatus = childNode.fetchNodeStatus();
            if (nodeStatus.isException()) {
                if (!childNodeStatus.isException()) {
                    log.info(">>>runningDagInstDispatcher|doChildrenOnException|{}->{}", (Object)childNode.getId(), (Object)DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION);
                    this.dagInstNodeDAO.updateStatus(childNode.getId(), DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION);
                }
                childNode.setStatus(DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION.toString());
            } else if (nodeStatus.isStopped()) {
                if (!childNodeStatus.isStopped()) {
                    log.info(">>>runningDagInstDispatcher|doChildrenOnStopped|{}->{}", (Object)childNode.getId(), (Object)DagInstNodeStatus.SKIP_CAUSE_BY_EXCEPTION);
                    this.dagInstNodeDAO.updateStatus(childNode.getId(), DagInstNodeStatus.SKIP_CAUSE_BY_STOPPED);
                }
                childNode.setStatus(DagInstNodeStatus.SKIP_CAUSE_BY_STOPPED.toString());
            }
            this.doChildrenOnExceptionOrStopped(childNode, dag, id2NodeMap);
        }
    }

    private boolean isOnly(List<DagInstNodeStatus> list1, List<DagInstNodeStatus> list2) {
        return list1.stream().filter(dagInstNodeStatus -> list2.contains(dagInstNodeStatus)).count() == (long)list1.size();
    }

    private void dispatch(DagInstNodeDO node) {
        DagInstNodeTask dagInstNodeTask = DagInstNodeTask.builder().dagInstId(node.getDagInstId()).nodeId(node.getNodeId()).nodeTaskType(NodeTaskType.START).build();
        this.dagInstNodeTaskNotify.sendDagInstNodeTask(dagInstNodeTask);
    }
}

