/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.actor;

import akka.actor.UntypedActor;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.master.MapTaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;
import java.util.List;

public class TaskActor
extends UntypedActor {
    private TaskMasterPool masterPool = TaskMasterPool.INSTANCE;
    private static final Logger LOGGER = LogFactory.getLogger(TaskActor.class);

    @Override
    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof Worker.ContainerReportTaskStatusRequest) {
            this.handleTaskStatus((Worker.ContainerReportTaskStatusRequest)obj);
        } else if (obj instanceof Worker.ContainerBatchReportTaskStatuesRequest) {
            this.handleBatchTaskStatues((Worker.ContainerBatchReportTaskStatuesRequest)obj);
        } else if (obj instanceof Worker.WorkerMapTaskRequest) {
            this.handleMapTask((Worker.WorkerMapTaskRequest)obj);
        } else if (obj instanceof Worker.PullTaskFromMasterRequest) {
            this.handlePullTasks((Worker.PullTaskFromMasterRequest)obj);
        }
    }

    private void handleTaskStatus(Worker.ContainerReportTaskStatusRequest request) {
        try {
            TaskMaster taskMaster = this.masterPool.get(request.getJobInstanceId());
            LOGGER.debug("handleTaskStatus, uniqueId:{}, status:{}, workerAddr:{}", IdUtil.getUniqueId(request.getJobId(), request.getJobInstanceId(), request.getTaskId()), request.getStatus(), request.getWorkerAddr());
            if (taskMaster != null) {
                taskMaster.updateTaskStatus(request);
            }
        }
        catch (Throwable e) {
            LOGGER.error("jobInstanceId={}, taskId={}", request.getJobInstanceId(), request.getTaskId());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleBatchTaskStatues(Worker.ContainerBatchReportTaskStatuesRequest request) {
        LOGGER.info("jobInstanceId={}, serialNum={}, batch receive task status reqs, size:{}", request.getJobInstanceId(), request.getSerialNum(), request.getTaskStatuesCount());
        Worker.ContainerBatchReportTaskStatuesResponse response = null;
        try {
            TaskMaster taskMaster = this.masterPool.get(request.getJobInstanceId());
            if (taskMaster != null) {
                taskMaster.batchUpdateTaskStatus(request);
            }
            response = Worker.ContainerBatchReportTaskStatuesResponse.newBuilder().setSuccess(true).setDeliveryId(request.getDeliveryId()).build();
            this.getSender().tell(response, this.getSelf());
        }
        catch (Throwable e) {
            try {
                LOGGER.error("jobInstanceId={}, handleBatchTaskStatues error.", request.getJobInstanceId(), e);
                response = Worker.ContainerBatchReportTaskStatuesResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).setDeliveryId(request.getDeliveryId()).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMapTask(Worker.WorkerMapTaskRequest request) throws Exception {
        Worker.WorkerMapTaskResponse response = null;
        try {
            long jobInstanceId = request.getJobInstanceId();
            TaskMaster taskMaster = this.masterPool.get(jobInstanceId);
            if (taskMaster != null) {
                if (!(taskMaster instanceof MapTaskMaster)) {
                    response = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is not MapTaskMaster").build();
                    taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, "TaskMaster is not MapTaskMaster");
                } else {
                    try {
                        long startTime = System.currentTimeMillis();
                        boolean overload = ((MapTaskMaster)taskMaster).map(request.getTaskBodyList(), request.getTaskName());
                        LOGGER.debug("jobInstanceId={} map, cost={}ms", jobInstanceId, System.currentTimeMillis() - startTime);
                        response = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(true).setOverload(overload).build();
                    }
                    catch (Exception e) {
                        LOGGER.error("jobInstanceId={} map error", e);
                        taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(e));
                        throw e;
                    }
                }
            } else {
                response = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("can't found TaskMaster by jobInstanceId=" + jobInstanceId).build();
            }
            this.getSender().tell(response, this.getSelf());
        }
        catch (Throwable e) {
            try {
                LOGGER.error("jobInstanceId={}, handleMapTask error.", request.getJobInstanceId(), e);
                response = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handlePullTasks(Worker.PullTaskFromMasterRequest request) {
        long jobInstanceId = request.getJobInstanceId();
        TaskMaster taskMaster = this.masterPool.get(jobInstanceId);
        Worker.PullTaskFromMasterResponse response = null;
        try {
            if (taskMaster == null || !(taskMaster instanceof MapTaskMaster)) {
                response = Worker.PullTaskFromMasterResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is null or not MapTaskMaster, jobInstanceId=" + jobInstanceId).build();
            } else {
                List<Worker.MasterStartContainerRequest> reqs = ((MapTaskMaster)taskMaster).syncPullTasks(request.getSerialNum(), request.getPageSize(), request.getWorkerIdAddr());
                response = Worker.PullTaskFromMasterResponse.newBuilder().setSuccess(true).addAllRequest(reqs).build();
            }
            this.getSender().tell(response, this.getSelf());
        }
        catch (Throwable e) {
            try {
                LOGGER.error("", e);
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            finally {
                this.getSender().tell(response, this.getSelf());
            }
        }
    }
}

