/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.tesla.dag.services;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.tesla.dag.common.Tools;
import com.alibaba.tesla.dag.local.AbstractLocalNodeBase;
import com.alibaba.tesla.dag.local.ClassService;
import com.alibaba.tesla.dag.local.LocalTaskDO;
import com.alibaba.tesla.dag.model.domain.dagnode.DagInstNodeRunRet;
import com.alibaba.tesla.dag.model.domain.dagnode.DagNodeDetailLocal;
import com.alibaba.tesla.dag.repository.dao.DagInstDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstNodeDAO;
import com.alibaba.tesla.dag.repository.dao.DagInstNodeStdDAO;
import com.alibaba.tesla.dag.repository.domain.DagInstDO;
import com.alibaba.tesla.dag.repository.domain.DagInstNodeDO;
import com.alibaba.tesla.dag.repository.domain.DagInstNodeStdDO;
import com.alibaba.tesla.dag.repository.domain.DagNodeDO;
import com.alibaba.tesla.dag.schedule.task.TaskStatus;
import com.alibaba.tesla.dag.services.AbstractActionNewService;
import com.alibaba.tesla.dag.services.DagInstNodeNewService;
import com.alibaba.tesla.dag.util.DateUtil;
import com.google.common.base.Throwables;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class LocalTaskService {
    private static final Logger log = LoggerFactory.getLogger(LocalTaskService.class);
    @Autowired
    private DagInstNodeStdDAO dagInstNodeStdDAO;
    @Autowired
    private DagInstNodeDAO dagInstNodeDAO;
    @Autowired
    private DagInstDAO dagInstDAO;
    @Autowired
    private ClassService classService;
    @Autowired
    private DagInstNodeNewService dagInstNodeNewService;
    @Value(value="${dag.local.thread-pool-size:100}")
    private int localThreadPoolSize;
    private ThreadPoolExecutor localTaskThreadPoolExecutor;
    private ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(5);
    private Map<Long, Future> localTaskId2FutureMap = new HashMap<Long, Future>(1000);
    private Map<Long, AbstractLocalNodeBase> localTaskId2NodeMap = new HashMap<Long, AbstractLocalNodeBase>(1000);

    @PostConstruct
    public void initScheduled() {
        log.info(">>>localTaskService|initScheduled|enter|localThreadPoolSize={}", (Object)this.localThreadPoolSize);
        this.localTaskThreadPoolExecutor = new ThreadPoolExecutor(this.localThreadPoolSize, this.localThreadPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        this.scheduledService.scheduleAtFixedRate(new TimeoutHandler(), 60L, 5L, TimeUnit.SECONDS);
        this.scheduledService.scheduleAtFixedRate(new ReBuilder(), 10L, 10L, TimeUnit.SECONDS);
    }

    public ThreadPoolExecutor getLocalTaskThreadPoolExecutor() {
        return this.localTaskThreadPoolExecutor;
    }

    private DagInstNodeDO queryByLocalTaskId(Long localTaskId) {
        DagInstNodeStdDO dagInstNodeStdDO = this.dagInstNodeStdDAO.getDagInstNodeStdById(localTaskId);
        if (Objects.isNull(dagInstNodeStdDO)) {
            log.warn(">>>localTaskService|run|local task does not exist|localTaskId={}", (Object)localTaskId);
            return null;
        }
        String currentStatus = dagInstNodeStdDO.getStatus();
        if (DagInstNodeStdDAO.UN_END_STATUS_LIST.contains(currentStatus)) {
            return this.dagInstNodeDAO.getDagInstNode(dagInstNodeStdDO.getDagInstNodeId());
        }
        log.warn(">>>localTaskService|queryByLocalTaskId|double check|dagInstId={}, nodeId={}, localTaskId={}, currentStatus={}", new Object[]{dagInstNodeStdDO.getDagInstId(), dagInstNodeStdDO.getDagInstNodeId(), localTaskId, currentStatus});
        return null;
    }

    private boolean isTimeout(Long localTaskId) {
        AbstractLocalNodeBase localNode = this.localTaskId2NodeMap.get(localTaskId);
        if (Objects.isNull(localNode) || localNode.realRunTimeout == 0L) {
            return false;
        }
        long actTimestamp = DateUtil.currentSeconds() - localNode.actRunTimestamp;
        return actTimestamp > localNode.realRunTimeout;
    }

    public void doLocalTask(LocalTaskDO localTaskDO) {
        Long dagInstId = localTaskDO.getDagInstId();
        String nodeId = localTaskDO.getNodeId();
        Long localTaskId = localTaskDO.getTaskId();
        try {
            if (this.localTaskId2FutureMap.containsKey(localTaskDO.getTaskId())) {
                log.warn(">>>localTaskService|doLocalTask|task is exist|dagInstId={}, nodeId={}, localTaskId={}", new Object[]{dagInstId, nodeId, localTaskId});
                return;
            }
            Future<?> future = this.localTaskThreadPoolExecutor.submit(new LocalTask(localTaskDO));
            this.localTaskId2FutureMap.put(localTaskDO.getTaskId(), future);
        }
        catch (Throwable e) {
            log.error(">>>localTaskService|doLocalTask|Err|dagInstId={}, nodeId={}, localTaskId={}, Err={}", new Object[]{dagInstId, nodeId, localTaskId, e.toString(), e});
            this.dagInstNodeStdDAO.updateStatusWithDetail(localTaskId, TaskStatus.EXCEPTION, Throwables.getStackTraceAsString((Throwable)e));
            try {
                this.dagInstNodeNewService.jobCallBack(dagInstId, nodeId, localTaskId, TaskStatus.EXCEPTION, Throwables.getStackTraceAsString((Throwable)e));
            }
            catch (Exception e1) {
                log.error(">>>localTaskService|jobCallBack|Err|dagInstId={}, nodeId={}, localTaskId={}, Err={}", new Object[]{dagInstId, nodeId, localTaskId, e1.toString(), e1});
            }
        }
    }

    public AbstractLocalNodeBase newInstance(Long localTaskId, DagInstNodeDO dagInstNode) throws Exception {
        if (this.localTaskId2NodeMap.containsKey(localTaskId)) {
            return this.localTaskId2NodeMap.get(localTaskId);
        }
        DagInstDO dagInstDO = this.dagInstDAO.getDagInstById(dagInstNode.getDagInstId());
        DagNodeDO dagNodeDO = dagInstNode.fetchDagNode();
        DagNodeDetailLocal dagNodeDetailLocal = (DagNodeDetailLocal)dagNodeDO.fetchDetailInterface();
        String dagNodeName = dagNodeDetailLocal.getName();
        JSONObject params = AbstractActionNewService.inputParams(dagInstDO, dagInstNode);
        Class nodeClass = this.classService.nodeMap.get(dagNodeName);
        AbstractLocalNodeBase localNode = (AbstractLocalNodeBase)nodeClass.getConstructor(new Class[0]).newInstance(new Object[0]);
        log.info(">>>localTaskService|newInstance|dagInstId={}, taskId={}, dagNodeName={}, nodeClass={}", new Object[]{dagInstDO.getId(), localTaskId, dagNodeName, nodeClass});
        localNode.dagInstId = dagInstDO.getId();
        localNode.dagInstNodeId = dagInstNode.getId();
        localNode.params = params;
        localNode.globalParams = dagInstDO.fetchGlobalParamsJson();
        localNode.lastGlobalVariableTimestamp = DateUtil.currentSeconds();
        localNode.globalVariable = dagInstDO.fetchGlobalVariableJson();
        localNode.globalResult = dagInstDO.fetchGlobalResultJson();
        localNode.nodeId = dagInstNode.getNodeId();
        localNode.fatherNodeId = dagInstDO.fetchRelationNodeId();
        localNode.retryTimes = Objects.isNull(dagInstNode.getRetryTimes()) ? 0L : dagInstNode.getRetryTimes();
        localNode.instanceCreateTimestamp = DateUtil.currentSeconds();
        localNode.realRunTimeout = Objects.isNull(dagNodeDO.getRunTimeout()) ? 0L : dagNodeDO.getRunTimeout();
        this.localTaskId2NodeMap.put(localTaskId, localNode);
        return localNode;
    }

    private class LocalTask
    implements Runnable {
        private LocalTaskDO localTaskDO;

        public LocalTask(LocalTaskDO localTaskDO) {
            this.localTaskDO = localTaskDO;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            log.info(">>>localTaskService|localTask run enter|localTaskDO={}", (Object)this.localTaskDO);
            Long dagInstId = this.localTaskDO.getDagInstId();
            String nodeId = this.localTaskDO.getNodeId();
            Long localTaskId = this.localTaskDO.getTaskId();
            try {
                DagInstNodeStdDO dagInstNodeStdDO = DagInstNodeStdDO.builder().id(localTaskId).build();
                DagInstNodeDO dagInstNode = LocalTaskService.this.dagInstNodeDAO.getDagInstNode(dagInstId, nodeId);
                AbstractLocalNodeBase localNode = LocalTaskService.this.newInstance(localTaskId, dagInstNode);
                localNode.actRunTimestamp = DateUtil.currentSeconds();
                dagInstNodeStdDO.setStatus(TaskStatus.RUNNING.toString());
                dagInstNodeStdDO.setIp(Tools.localIp);
                dagInstNodeStdDO.setInitGlobalParams(JSONObject.toJSONString((Object)localNode.globalParams));
                LocalTaskService.this.dagInstNodeStdDAO.update(dagInstNodeStdDO);
                log.info(">>>localTask|run|enter|dagInstId={}, nodeId={}, localTaskId={}", new Object[]{dagInstId, nodeId, localTaskId});
                DagInstNodeRunRet ret = localNode.run();
                dagInstNodeStdDO.setStatus(TaskStatus.SUCCESS.name());
                dagInstNodeStdDO.setStdout(JSONObject.toJSONString((Object)ret));
                dagInstNodeStdDO.setGlobalParams(localNode.isDeleteParams != false ? "__DEL__" : JSONObject.toJSONString((Object)localNode.globalParams));
                LocalTaskService.this.dagInstNodeStdDAO.update(dagInstNodeStdDO);
                log.info(">>>localTask|run|exit|dagInstId={}, nodeId={}, localTaskId={}", new Object[]{dagInstId, nodeId, localTaskId});
                LocalTaskService.this.dagInstNodeNewService.jobCallBack(dagInstId, nodeId, localTaskId, TaskStatus.SUCCESS, "");
            }
            catch (Exception e) {
                log.error(">>>localTask|run|Err|dagInstId={}, nodeId={}, localTaskId={}, Err={}", new Object[]{dagInstId, nodeId, localTaskId, e.toString(), e});
                LocalTaskService.this.dagInstNodeStdDAO.updateStatusWithDetail(localTaskId, TaskStatus.EXCEPTION, Throwables.getStackTraceAsString((Throwable)e));
                try {
                    LocalTaskService.this.dagInstNodeNewService.jobCallBack(dagInstId, nodeId, localTaskId, TaskStatus.EXCEPTION, Throwables.getStackTraceAsString((Throwable)e));
                }
                catch (Exception e1) {
                    log.error(">>>localTask|jobCallBack|Err|dagInstId={}, nodeId={}, localTaskId={}, Err={}", new Object[]{dagInstId, nodeId, localTaskId, e1.toString(), e1});
                }
            }
            finally {
                LocalTaskService.this.localTaskId2FutureMap.remove(localTaskId);
                LocalTaskService.this.localTaskId2NodeMap.remove(localTaskId);
            }
        }
    }

    private class ReBuilder
    implements Runnable {
        private ReBuilder() {
        }

        @Override
        public void run() {
            Long end = DateUtil.currentSeconds() - 30L;
            Long begin = end - (long)(60 * DateUtil.MINUTE);
            List<DagInstNodeStdDO> timeOutList = LocalTaskService.this.dagInstNodeStdDAO.listTimeOut(begin, end);
            if (CollectionUtils.isNotEmpty(timeOutList)) {
                for (DagInstNodeStdDO dagInstNodeStdDO : timeOutList) {
                    DagInstNodeDO dagInstNodeDO;
                    Long localTaskId = dagInstNodeStdDO.getId();
                    if (LocalTaskService.this.localTaskId2FutureMap.containsKey(localTaskId) || !Objects.nonNull(dagInstNodeDO = LocalTaskService.this.queryByLocalTaskId(localTaskId))) continue;
                    Long dagInstId = dagInstNodeDO.getDagInstId();
                    String nodeId = dagInstNodeDO.getNodeId();
                    log.info(">>>reBuilder|task is rebuild|dagInstId={}, nodeId={}, localTaskId={}", new Object[]{dagInstId, nodeId, localTaskId});
                    LocalTaskService.this.doLocalTask(LocalTaskDO.builder().dagInstId(dagInstId).nodeId(nodeId).taskId(localTaskId).build());
                }
            }
        }
    }

    private class TimeoutHandler
    implements Runnable {
        private TimeoutHandler() {
        }

        @Override
        public void run() {
            try {
                List localTaskIdList = LocalTaskService.this.localTaskId2FutureMap.keySet().stream().collect(Collectors.toList());
                for (Long localTaskId : localTaskIdList) {
                    if (!LocalTaskService.this.isTimeout(localTaskId)) continue;
                    log.info(">>>timeoutHandler|task is timeout|localTaskId={}", (Object)localTaskId);
                    LocalTaskService.this.dagInstNodeStdDAO.updateStatusWithDetail(localTaskId, TaskStatus.EXCEPTION, "timeout, killed by system");
                    Future future = (Future)LocalTaskService.this.localTaskId2FutureMap.remove(localTaskId);
                    if (Objects.nonNull(future)) {
                        future.cancel(true);
                    }
                    LocalTaskService.this.localTaskId2NodeMap.remove(localTaskId);
                }
            }
            catch (Exception e) {
                log.error(">>>timeoutHandler|scheduleAtFixedRate|Err={}", (Object)e.toString(), (Object)e);
            }
        }
    }
}

