/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.processor.queue;

import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecuteThread {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
    private final int processInstanceId;
    private final ConcurrentLinkedQueue<TaskEvent> events = new ConcurrentLinkedQueue();
    private ProcessService processService;
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    private DataQualityResultOperator dataQualityResultOperator;

    public TaskExecuteThread(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool, ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
        this.processInstanceId = processInstanceId;
        this.processService = processService;
        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
        this.dataQualityResultOperator = dataQualityResultOperator;
    }

    public void run() {
        while (!this.events.isEmpty()) {
            TaskEvent event = this.events.peek();
            try {
                this.persist(event);
            }
            catch (Exception e) {
                logger.error("persist error, event:{}, error: {}", (Object)event, (Object)e);
            }
            finally {
                this.events.remove(event);
            }
        }
    }

    public String getKey() {
        return String.valueOf(this.processInstanceId);
    }

    public int eventSize() {
        return this.events.size();
    }

    public boolean isEmpty() {
        return this.events.isEmpty();
    }

    public Integer getProcessInstanceId() {
        return this.processInstanceId;
    }

    public boolean addEvent(TaskEvent event) {
        if (event.getProcessInstanceId() != this.processInstanceId) {
            logger.warn("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}", new Object[]{event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId});
            return false;
        }
        return this.events.add(event);
    }

    private void persist(TaskEvent taskEvent) {
        Event event = taskEvent.getEvent();
        int taskInstanceId = taskEvent.getTaskInstanceId();
        int processInstanceId = taskEvent.getProcessInstanceId();
        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
        TaskInstance taskInstance = workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId) ? workflowExecuteThread.getTaskInstance(taskInstanceId) : this.processService.findTaskInstanceById(Integer.valueOf(taskInstanceId));
        switch (event) {
            case DISPATCH: {
                this.handleDispatchEvent(taskEvent, taskInstance);
                return;
            }
            case DELAY: 
            case RUNNING: {
                this.handleRunningEvent(taskEvent, taskInstance);
                break;
            }
            case RESULT: {
                this.handleResultEvent(taskEvent, taskInstance);
                break;
            }
            default: {
                throw new IllegalArgumentException("invalid event type : " + event);
            }
        }
        StateEvent stateEvent = new StateEvent();
        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
        stateEvent.setExecutionStatus(taskEvent.getState());
        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
        this.workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }

    private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
        if (taskInstance == null) {
            logger.error("taskInstance is null");
            return;
        }
        if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
            return;
        }
        taskInstance.setState(ExecutionStatus.DISPATCH);
        taskInstance.setHost(taskEvent.getWorkerAddress());
        this.processService.saveTaskInstance(taskInstance);
    }

    private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
        Channel channel = taskEvent.getChannel();
        try {
            if (taskInstance != null) {
                if (taskInstance.getState().typeIsFinished()) {
                    logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", (Object)taskInstance.getId(), (Object)taskInstance.getState());
                } else {
                    taskInstance.setState(taskEvent.getState());
                    taskInstance.setStartTime(taskEvent.getStartTime());
                    taskInstance.setHost(taskEvent.getWorkerAddress());
                    taskInstance.setLogPath(taskEvent.getLogPath());
                    taskInstance.setExecutePath(taskEvent.getExecutePath());
                    taskInstance.setPid(taskEvent.getProcessId());
                    taskInstance.setAppLink(taskEvent.getAppIds());
                    this.processService.saveTaskInstance(taskInstance);
                }
            }
            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
            channel.writeAndFlush((Object)taskExecuteRunningAckCommand.convert2Command());
        }
        catch (Exception e) {
            logger.error("worker ack master error", (Throwable)e);
            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
            channel.writeAndFlush((Object)taskExecuteRunningAckCommand.convert2Command());
        }
    }

    private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
        Channel channel = taskEvent.getChannel();
        try {
            if (taskInstance != null) {
                this.dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
                taskInstance.setStartTime(taskEvent.getStartTime());
                taskInstance.setHost(taskEvent.getWorkerAddress());
                taskInstance.setLogPath(taskEvent.getLogPath());
                taskInstance.setExecutePath(taskEvent.getExecutePath());
                taskInstance.setPid(taskEvent.getProcessId());
                taskInstance.setAppLink(taskEvent.getAppIds());
                taskInstance.setState(taskEvent.getState());
                taskInstance.setEndTime(taskEvent.getEndTime());
                taskInstance.setVarPool(taskEvent.getVarPool());
                this.processService.changeOutParam(taskInstance);
                this.processService.saveTaskInstance(taskInstance);
            }
            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
            channel.writeAndFlush((Object)taskExecuteResponseAckCommand.convert2Command());
        }
        catch (Exception e) {
            logger.error("worker response master error", (Throwable)e);
            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
            channel.writeAndFlush((Object)taskExecuteResponseAckCommand.convert2Command());
        }
    }
}

