/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.processor.queue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.StreamTaskInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
public class TaskExecuteThreadPool
extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadPool.class);
    private final ConcurrentHashMap<String, TaskExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap();
    @Autowired
    private MasterConfig masterConfig;
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private List<TaskEventHandler> taskEventHandlerList;
    @Autowired
    private StreamTaskInstanceExecCacheManager streamTaskInstanceExecCacheManager;
    private Map<TaskEventType, TaskEventHandler> taskEventHandlerMap = new HashMap<TaskEventType, TaskEventHandler>();
    private final ConcurrentHashMap<Integer, TaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap();

    @PostConstruct
    private void init() {
        this.setDaemon(true);
        this.setThreadNamePrefix("Task-Execute-Thread-");
        this.setMaxPoolSize(this.masterConfig.getExecThreads());
        this.setCorePoolSize(this.masterConfig.getExecThreads());
        this.taskEventHandlerList.forEach(taskEventHandler -> this.taskEventHandlerMap.put(taskEventHandler.getHandleEventType(), (TaskEventHandler)taskEventHandler));
    }

    public void submitTaskEvent(TaskEvent taskEvent) {
        if (taskEvent.getProcessInstanceId() == 0 && this.streamTaskInstanceExecCacheManager.contains(taskEvent.getTaskInstanceId())) {
            this.streamTaskInstanceExecCacheManager.getByTaskInstanceId(taskEvent.getTaskInstanceId()).addTaskEvent(taskEvent);
            return;
        }
        if (!this.processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
            logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", (Object)taskEvent);
            return;
        }
        TaskExecuteRunnable taskExecuteRunnable = this.taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(), processInstanceId -> new TaskExecuteRunnable((int)processInstanceId, this.taskEventHandlerMap));
        taskExecuteRunnable.addEvent(taskEvent);
    }

    public void eventHandler() {
        for (TaskExecuteRunnable taskExecuteThread : this.taskExecuteThreadMap.values()) {
            this.executeEvent(taskExecuteThread);
        }
    }

    public void executeEvent(final TaskExecuteRunnable taskExecuteThread) {
        if (taskExecuteThread.isEmpty()) {
            return;
        }
        if (this.multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
            return;
        }
        this.multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
        ListenableFuture future = this.submitListenable(taskExecuteThread::run);
        future.addCallback(new ListenableFutureCallback(){

            public void onFailure(Throwable ex) {
                Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
                logger.error("[WorkflowInstance-{}] persist event failed", (Object)processInstanceId, (Object)ex);
                if (!TaskExecuteThreadPool.this.processInstanceExecCacheManager.contains(processInstanceId)) {
                    TaskExecuteThreadPool.this.taskExecuteThreadMap.remove(processInstanceId);
                    logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap", (Object)processInstanceId);
                }
                TaskExecuteThreadPool.this.multiThreadFilterMap.remove(taskExecuteThread.getKey());
            }

            public void onSuccess(Object result) {
                Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
                logger.info("[WorkflowInstance-{}] persist events succeeded", (Object)processInstanceId);
                if (!TaskExecuteThreadPool.this.processInstanceExecCacheManager.contains(processInstanceId)) {
                    TaskExecuteThreadPool.this.taskExecuteThreadMap.remove(processInstanceId);
                    logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap", (Object)processInstanceId);
                }
                TaskExecuteThreadPool.this.multiThreadFilterMap.remove(taskExecuteThread.getKey());
            }
        });
    }
}

