/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.processor;

import com.google.common.base.Preconditions;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
import java.util.Date;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnableFactoryBuilder;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.storage.StorageOperate;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.service.utils.LogUtils;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskDispatchProcessor
implements NettyRequestProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TaskDispatchProcessor.class);
    @Autowired
    private WorkerConfig workerConfig;
    @Autowired
    private WorkerMessageSender workerMessageSender;
    @Autowired
    private AlertClientService alertClientService;
    @Autowired
    private TaskPluginManager taskPluginManager;
    @Autowired
    private WorkerManagerThread workerManager;
    @Autowired(required=false)
    private StorageOperate storageOperate;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Counted(value="ds.task.execution.count", description="task execute total count")
    @Timed(value="ds.task.execution.duration", percentiles={0.5, 0.75, 0.95, 0.99}, histogram=true)
    public void process(Channel channel, Command command) {
        Preconditions.checkArgument((CommandType.TASK_DISPATCH_REQUEST == command.getType() ? 1 : 0) != 0, (Object)String.format("invalid command type : %s", command.getType()));
        TaskDispatchCommand taskDispatchCommand = (TaskDispatchCommand)JSONUtils.parseObject((byte[])command.getBody(), TaskDispatchCommand.class);
        if (taskDispatchCommand == null) {
            logger.error("task execute request command content is null");
            return;
        }
        String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();
        logger.info("Receive task dispatch request, command: {}", (Object)taskDispatchCommand);
        TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
        if (taskExecutionContext == null) {
            logger.error("task execution context is null");
            return;
        }
        try {
            Object workerTaskExecuteRunnable;
            boolean offer;
            LoggerUtils.setWorkflowAndTaskInstanceIDMDC((Integer)taskExecutionContext.getProcessInstanceId(), (Integer)taskExecutionContext.getTaskInstanceId());
            TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
            TaskExecutionContextCacheManager.cacheTaskExecutionContext((TaskExecutionContext)taskExecutionContext);
            taskExecutionContext.setHost(this.workerConfig.getWorkerAddress());
            taskExecutionContext.setLogPath(LogUtils.getTaskLogPath((TaskExecutionContext)taskExecutionContext));
            long remainTime = DateUtils.getRemainTime((Date)taskExecutionContext.getFirstSubmitTime(), (long)((long)taskExecutionContext.getDelayTime() * 60L));
            if (remainTime > 0L) {
                logger.info("Current taskInstance is choose delay execution, delay time: {}s", (Object)remainTime);
                taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
                this.workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress, CommandType.TASK_EXECUTE_RESULT);
            }
            if (!(offer = this.workerManager.offer((WorkerDelayTaskExecuteRunnable)(workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder.createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext, this.workerConfig, workflowMasterAddress, this.workerMessageSender, this.alertClientService, this.taskPluginManager, this.storageOperate).createWorkerTaskExecuteRunnable())))) {
                logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", (Object)this.workerManager.getWaitSubmitQueueSize());
                this.workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);
            } else {
                logger.info("Submit task to wait queue success, current queue size is {}", (Object)this.workerManager.getWaitSubmitQueueSize());
            }
        }
        finally {
            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
        }
    }
}

