/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.event;

import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskRejectByWorkerEventHandler
implements TaskEventHandler {
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private MasterConfig masterConfig;

    @Override
    public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
        int taskInstanceId = taskEvent.getTaskInstanceId();
        int processInstanceId = taskEvent.getProcessInstanceId();
        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
        if (workflowExecuteRunnable == null) {
            this.sendAckToWorker(taskEvent);
            throw new TaskEventHandleError("Handle task reject event error, cannot find related workflow instance from cache, will discard this event");
        }
        TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> {
            this.sendAckToWorker(taskEvent);
            return new TaskEventHandleError("Handle task reject event error, cannot find the taskInstance from cache, will discord this event");
        });
        try {
            workflowExecuteRunnable.resubmit(taskInstance.getTaskCode());
            this.sendAckToWorker(taskEvent);
        }
        catch (Exception ex) {
            throw new TaskEventHandleError("Handle task reject event error", ex);
        }
    }

    public void sendAckToWorker(TaskEvent taskEvent) {
        TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(true, taskEvent.getTaskInstanceId(), this.masterConfig.getMasterAddress(), taskEvent.getWorkerAddress(), System.currentTimeMillis());
        taskEvent.getChannel().writeAndFlush((Object)taskRejectAckMessage.convert2Command());
    }

    @Override
    public TaskEventType getHandleEventType() {
        return TaskEventType.WORKER_REJECT;
    }
}

