/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskPriorityQueueConsumer
extends BaseDaemonThread {
    private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueConsumer.class);
    @Autowired
    private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
    @Autowired
    private ProcessService processService;
    @Autowired
    private ExecutorDispatcher dispatcher;
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private MasterConfig masterConfig;
    @Autowired
    private TaskEventService taskEventService;
    private ThreadPoolExecutor consumerThreadPoolExecutor;

    protected TaskPriorityQueueConsumer() {
        super("TaskPriorityQueueConsumeThread");
    }

    @PostConstruct
    public void init() {
        this.consumerThreadPoolExecutor = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor((String)"TaskUpdateQueueConsumerThread", (int)this.masterConfig.getDispatchTaskNumber());
        logger.info("Task priority queue consume thread staring");
        super.start();
        logger.info("Task priority queue consume thread started");
    }

    public void run() {
        int fetchTaskNum = this.masterConfig.getDispatchTaskNumber();
        while (Stopper.isRunning()) {
            try {
                List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
                if (!CollectionUtils.isNotEmpty(failedDispatchTasks)) continue;
                TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
                for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                    this.taskPriorityQueue.put((Object)dispatchFailedTask);
                }
                if (fetchTaskNum != failedDispatchTasks.size()) continue;
                TimeUnit.MILLISECONDS.sleep(1000L);
            }
            catch (Exception e) {
                TaskMetrics.incTaskDispatchError();
                logger.error("dispatcher task error", (Throwable)e);
            }
        }
    }

    public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
        List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList());
        CountDownLatch latch = new CountDownLatch(fetchTaskNum);
        for (int i = 0; i < fetchTaskNum; ++i) {
            TaskPriority taskPriority = (TaskPriority)this.taskPriorityQueue.poll(1000L, TimeUnit.MILLISECONDS);
            if (Objects.isNull(taskPriority)) {
                latch.countDown();
                continue;
            }
            this.consumerThreadPoolExecutor.submit(() -> {
                try {
                    boolean dispatchResult = this.dispatchTask(taskPriority);
                    if (!dispatchResult) {
                        failedDispatchTasks.add(taskPriority);
                    }
                }
                finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
        return failedDispatchTasks;
    }

    protected boolean dispatchTask(TaskPriority taskPriority) {
        TaskMetrics.incTaskDispatch();
        boolean result = false;
        try {
            WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
            if (workflowExecuteRunnable == null) {
                logger.error("Cannot find the related processInstance of the task, taskPriority: {}", (Object)taskPriority);
                return true;
            }
            Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
            if (!taskInstanceOptional.isPresent()) {
                logger.error("Cannot find the task instance from related processInstance, taskPriority: {}", (Object)taskPriority);
                return true;
            }
            TaskInstance taskInstance = taskInstanceOptional.get();
            TaskExecutionContext context = taskPriority.getTaskExecutionContext();
            ExecutionContext executionContext = new ExecutionContext(this.toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance);
            if (this.isTaskNeedToCheck(taskPriority) && this.taskInstanceIsFinalState(taskPriority.getTaskId())) {
                return true;
            }
            result = this.dispatcher.dispatch(executionContext);
            if (result) {
                logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}", (Object)taskPriority.getTaskId(), (Object)executionContext.getHost());
                this.addDispatchEvent(context, executionContext);
            } else {
                logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}", (Object)taskPriority.getTaskId(), (Object)executionContext.getHost());
            }
        }
        catch (RuntimeException | ExecuteException e) {
            logger.error("Master dispatch task to worker error, taskPriority: {}", (Object)taskPriority, (Object)e);
        }
        return result;
    }

    private void addDispatchEvent(TaskExecutionContext context, ExecutionContext executionContext) {
        TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(), executionContext.getHost().getAddress());
        this.taskEventService.addEvent(taskEvent);
    }

    private Command toCommand(TaskExecutionContext taskExecutionContext) {
        TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext, this.masterConfig.getMasterAddress(), taskExecutionContext.getHost(), System.currentTimeMillis());
        return requestCommand.convert2Command();
    }

    public boolean taskInstanceIsFinalState(int taskInstanceId) {
        TaskInstance taskInstance = this.processService.findTaskInstanceById(Integer.valueOf(taskInstanceId));
        return taskInstance.getState().typeIsFinished();
    }

    private boolean isTaskNeedToCheck(TaskPriority taskPriority) {
        long now = System.currentTimeMillis();
        if (now - taskPriority.getCheckpoint() > 1000L) {
            taskPriority.setCheckpoint(now);
            return true;
        }
        return false;
    }
}

