/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.runner;

import ch.qos.logback.classic.ClassicConstants;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.log.SensitiveDataConverter;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.extract.alert.IAlertOperator;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendRequest;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskCallbackImpl;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils;
import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class WorkerTaskExecutor
implements Runnable {
    protected static final Logger log = LoggerFactory.getLogger(WorkerTaskExecutor.class);
    protected final TaskExecutionContext taskExecutionContext;
    protected final WorkerConfig workerConfig;
    protected final WorkerMessageSender workerMessageSender;
    protected final TaskPluginManager taskPluginManager;
    @Nullable
    protected final StorageOperate storageOperate;
    protected final WorkerRegistryClient workerRegistryClient;
    @Nullable
    protected AbstractTask task;

    protected WorkerTaskExecutor(@NonNull TaskExecutionContext taskExecutionContext, @NonNull WorkerConfig workerConfig, @NonNull WorkerMessageSender workerMessageSender, @NonNull TaskPluginManager taskPluginManager, @Nullable StorageOperate storageOperate, @NonNull WorkerRegistryClient workerRegistryClient) {
        if (taskExecutionContext == null) {
            throw new NullPointerException("taskExecutionContext is marked non-null but is null");
        }
        if (workerConfig == null) {
            throw new NullPointerException("workerConfig is marked non-null but is null");
        }
        if (workerMessageSender == null) {
            throw new NullPointerException("workerMessageSender is marked non-null but is null");
        }
        if (taskPluginManager == null) {
            throw new NullPointerException("taskPluginManager is marked non-null but is null");
        }
        if (workerRegistryClient == null) {
            throw new NullPointerException("workerRegistryClient is marked non-null but is null");
        }
        this.taskExecutionContext = taskExecutionContext;
        this.workerConfig = workerConfig;
        this.workerMessageSender = workerMessageSender;
        this.taskPluginManager = taskPluginManager;
        this.storageOperate = storageOperate;
        this.workerRegistryClient = workerRegistryClient;
        SensitiveDataConverter.addMaskPattern((String)"(?<=((?i)configYaml(\" : \"))).*?(?=(\",\\n))");
    }

    protected abstract void executeTask(TaskCallBack var1);

    protected void afterExecute() throws TaskException {
        if (this.task == null) {
            throw new TaskException("The current task instance is null");
        }
        this.sendAlertIfNeeded();
        this.sendTaskResult();
        WorkerTaskExecutorHolder.remove(this.taskExecutionContext.getTaskInstanceId());
        log.info("Remove the current task execute context from worker cache");
        this.clearTaskExecPathIfNeeded();
    }

    protected void afterThrowing(Throwable throwable) throws TaskException {
        if (this.cancelTask()) {
            log.info("Cancel the task successfully");
        }
        WorkerTaskExecutorHolder.remove(this.taskExecutionContext.getTaskInstanceId());
        this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
        this.taskExecutionContext.setEndTime(System.currentTimeMillis());
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
        log.info("Get a exception when execute the task, will send the task status: {} to master: {}", (Object)TaskExecutionStatus.FAILURE.name(), (Object)this.taskExecutionContext.getHost());
    }

    protected boolean cancelTask() {
        if (this.task == null) {
            return true;
        }
        try {
            this.task.cancel();
            ProcessUtils.cancelApplication((TaskExecutionContext)this.taskExecutionContext);
            return true;
        }
        catch (Exception e) {
            log.error("Cancel task failed, this will not affect the taskInstance status, but you need to check manual", (Throwable)e);
            return false;
        }
    }

    @Override
    public void run() {
        try {
            LogUtils.setWorkflowAndTaskInstanceIDMDC((Integer)this.taskExecutionContext.getProcessInstanceId(), (Integer)this.taskExecutionContext.getTaskInstanceId());
            LogUtils.setTaskInstanceLogFullPathMDC((String)this.taskExecutionContext.getLogPath());
            TaskInstanceLogHeader.printInitializeTaskContextHeader();
            this.initializeTask();
            if (1 == this.taskExecutionContext.getDryRun()) {
                this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
                this.taskExecutionContext.setEndTime(System.currentTimeMillis());
                WorkerTaskExecutorHolder.remove(this.taskExecutionContext.getTaskInstanceId());
                this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
                log.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
                return;
            }
            TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();
            this.beforeExecute();
            TaskCallbackImpl taskCallBack = TaskCallbackImpl.builder().workerMessageSender(this.workerMessageSender).taskExecutionContext(this.taskExecutionContext).build();
            TaskInstanceLogHeader.printExecuteTaskHeader();
            this.executeTask(taskCallBack);
            TaskInstanceLogHeader.printFinalizeTaskHeader();
            this.afterExecute();
            this.closeLogAppender();
        }
        catch (Throwable ex) {
            log.error("Task execute failed, due to meet an exception", ex);
            this.afterThrowing(ex);
            this.closeLogAppender();
        }
        finally {
            LogUtils.removeWorkflowAndTaskInstanceIdMDC();
            LogUtils.removeTaskInstanceLogFullPathMDC();
        }
    }

    protected void initializeTask() {
        log.info("Begin to initialize task");
        long taskStartTime = System.currentTimeMillis();
        this.taskExecutionContext.setStartTime(taskStartTime);
        log.info("Set task startTime: {}", (Object)taskStartTime);
        String taskAppId = String.format("%s_%s", this.taskExecutionContext.getProcessInstanceId(), this.taskExecutionContext.getTaskInstanceId());
        this.taskExecutionContext.setTaskAppId(taskAppId);
        log.info("Set task appId: {}", (Object)taskAppId);
        log.info("End initialize task {}", (Object)JSONUtils.toPrettyJsonString((Object)this.taskExecutionContext));
    }

    protected void beforeExecute() {
        this.taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING);
        log.info("Send task status {} master: {}", (Object)TaskExecutionStatus.RUNNING_EXECUTION.name(), (Object)this.taskExecutionContext.getHost());
        String originTenant = this.taskExecutionContext.getTenantCode();
        String tenant = TaskExecutionContextUtils.getOrCreateTenant(this.workerConfig, this.taskExecutionContext);
        this.taskExecutionContext.setTenantCode(tenant);
        log.info("TenantCode: {} check successfully", (Object)this.taskExecutionContext.getTenantCode());
        TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(this.taskExecutionContext);
        log.info("WorkflowInstanceExecDir: {} check successfully", (Object)this.taskExecutionContext.getExecutePath());
        TaskChannel taskChannel = (TaskChannel)Optional.ofNullable(this.taskPluginManager.getTaskChannelMap().get(this.taskExecutionContext.getTaskType())).orElseThrow(() -> new TaskPluginException(this.taskExecutionContext.getTaskType() + " task plugin not found, please check the task type is correct."));
        log.info("Create TaskChannel: {} successfully", (Object)taskChannel.getClass().getName());
        ResourceContext resourceContext = TaskExecutionContextUtils.downloadResourcesIfNeeded(originTenant, taskChannel, this.storageOperate, this.taskExecutionContext);
        this.taskExecutionContext.setResourceContext(resourceContext);
        log.info("Download resources successfully: \n{}", (Object)this.taskExecutionContext.getResourceContext());
        TaskFilesTransferUtils.downloadUpstreamFiles(this.taskExecutionContext, this.storageOperate);
        log.info("Download upstream files: {} successfully", TaskFilesTransferUtils.getFileLocalParams(this.taskExecutionContext, Direct.IN));
        this.task = taskChannel.createTask(this.taskExecutionContext);
        log.info("Task plugin instance: {} create successfully", (Object)this.taskExecutionContext.getTaskType());
        this.task.init();
        log.info("Success initialized task plugin instance successfully");
        this.task.getParameters().setVarPool(this.taskExecutionContext.getVarPool());
        log.info("Set taskVarPool: {} successfully", (Object)this.taskExecutionContext.getVarPool());
    }

    protected void sendAlertIfNeeded() {
        if (!this.task.getNeedAlert()) {
            return;
        }
        Optional<Host> alertServerAddressOptional = this.workerRegistryClient.getAlertServerAddress();
        if (!alertServerAddressOptional.isPresent()) {
            log.error("Cannot get alert server address, please check the alert server is running");
            return;
        }
        Host alertServerAddress = alertServerAddressOptional.get();
        TaskAlertInfo taskAlertInfo = this.task.getTaskAlertInfo();
        AlertSendRequest alertSendRequest = new AlertSendRequest(taskAlertInfo.getAlertGroupId().intValue(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), this.task.getExitStatus() == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode());
        try {
            IAlertOperator alertOperator = (IAlertOperator)SingletonJdkDynamicRpcClientProxyFactory.getProxyClient((String)alertServerAddress.getAddress(), IAlertOperator.class);
            AlertSendResponse alertSendResponse = alertOperator.sendAlert(alertSendRequest);
            log.info("Send alert to: {} successfully, response: {}", (Object)alertServerAddress, (Object)alertSendResponse);
        }
        catch (Exception e) {
            log.error("Send alert: {} to: {} failed", new Object[]{alertSendRequest, alertServerAddress, e});
        }
    }

    protected void sendTaskResult() {
        this.taskExecutionContext.setCurrentExecutionStatus(this.task.getExitStatus());
        this.taskExecutionContext.setProcessId(this.task.getProcessId());
        this.taskExecutionContext.setAppIds(this.task.getAppIds());
        this.taskExecutionContext.setVarPool(JSONUtils.toJsonString((Object)this.task.getParameters().getVarPool()));
        this.taskExecutionContext.setEndTime(System.currentTimeMillis());
        TaskFilesTransferUtils.uploadOutputFiles(this.taskExecutionContext, this.storageOperate);
        log.info("Upload output files: {} successfully", TaskFilesTransferUtils.getFileLocalParams(this.taskExecutionContext, Direct.OUT));
        this.workerMessageSender.sendMessageWithRetry(this.taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
        log.info("Send task execute status: {} to master : {}", (Object)this.taskExecutionContext.getCurrentExecutionStatus().name(), (Object)this.taskExecutionContext.getHost());
    }

    protected void clearTaskExecPathIfNeeded() {
        String execLocalPath = this.taskExecutionContext.getExecutePath();
        if (!CommonUtils.isDevelopMode()) {
            log.info("The current execute mode isn't develop mode, will clear the task execute file: {}", (Object)execLocalPath);
            if (Strings.isNullOrEmpty((String)execLocalPath)) {
                log.warn("The task execute file is {} no need to clear", (Object)this.taskExecutionContext.getTaskName());
                return;
            }
            if ("/".equals(execLocalPath)) {
                log.warn("The task execute file is '/', direct deletion is not allowed");
                return;
            }
            try {
                FileUtils.deleteDirectory((File)new File(execLocalPath));
                log.info("Success clear the task execute file: {}", (Object)execLocalPath);
            }
            catch (IOException e) {
                if (!(e instanceof NoSuchFileException)) {
                    log.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", (Object)execLocalPath, (Object)e);
                }
            }
        } else {
            log.info("The current execute mode is develop mode, will not clear the task execute file: {}", (Object)execLocalPath);
        }
    }

    protected void closeLogAppender() {
        try {
            if (RemoteLogUtils.isRemoteLoggingEnable()) {
                RemoteLogUtils.sendRemoteLog((String)this.taskExecutionContext.getLogPath());
                log.info("Log handler sends task log {} to remote storage asynchronously.", (Object)this.taskExecutionContext.getLogPath());
            }
        }
        catch (Exception ex) {
            log.error("Send remote log failed", (Throwable)ex);
        }
        finally {
            log.info(ClassicConstants.FINALIZE_SESSION_MARKER, ClassicConstants.FINALIZE_SESSION_MARKER.toString());
        }
    }

    @NonNull
    public TaskExecutionContext getTaskExecutionContext() {
        return this.taskExecutionContext;
    }

    @Nullable
    public AbstractTask getTask() {
        return this.task;
    }
}

