/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.master;

import akka.actor.ActorContext;
import akka.actor.ActorSelection;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.domain.MapTaskProgress;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.common.domain.TimeType;
import com.alibaba.schedulerx.common.domain.WorkerProgressCounter;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.common.util.JobUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.MapJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.util.ActorPathUtil;
import com.alibaba.schedulerx.worker.util.JavaProcessorProfileUtil;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class BroadcastTaskMaster
extends TaskMaster {
    private static final Logger LOGGER = LogFactory.getLogger(BroadcastTaskMaster.class);
    private Map<String, String> worker2uniqueIdMap = Maps.newConcurrentMap();
    private Map<String, WorkerProgressCounter> workerProgressMap = Maps.newConcurrentMap();
    private LogCollector logCollector = LogCollectorFactory.get();
    private volatile boolean running = false;
    private volatile boolean monitor = false;
    private Map<Long, String> taskIdResultMap = Maps.newHashMap();
    private Map<Long, TaskStatus> taskIdStatusMap = Maps.newHashMap();
    private List<String> allWorkers = Lists.newArrayList();

    public BroadcastTaskMaster(JobInstanceInfo jobInstanceInfo, ActorContext actorContext) throws Exception {
        super(jobInstanceInfo, actorContext);
    }

    @Override
    public synchronized void submitInstance(JobInstanceInfo info) {
        String workerAddr;
        String[] workerInfo;
        if ("java".equalsIgnoreCase(info.getJobType())) {
            try {
                this.preProcess(info);
            }
            catch (Exception e) {
                LOGGER.error("BroadcastTaskMaster.preProcess failed, jobInstanceId={}", info.getJobInstanceId(), e);
                String uniqueId = IdUtil.getUniqueId(info.getJobId(), info.getJobInstanceId(), 0L);
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[BroadcastTaskMaster-submitTask]broadcast task init fail worker addr is ", SchedulerxWorker.WORKER_ADDR, ExceptionUtil.getMessage(e)));
                Worker.ContainerReportTaskStatusRequest faileRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(info.getJobId()).setJobInstanceId(info.getJobInstanceId()).setTaskId(0L).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(WorkerIdGenerator.get()).setWorkerAddr(SchedulerxWorker.WORKER_ADDR).build();
                this.updateTaskStatus(faileRequest);
                return;
            }
        }
        super.init();
        this.allWorkers = info.getAllWorkers();
        HashMap<String, Long> taskIdMap = new HashMap<String, Long>();
        for (String workerIdAddr : this.allWorkers) {
            workerInfo = workerIdAddr.split("@");
            workerAddr = workerInfo[1];
            long taskId = this.aquireTaskId();
            String uniqueId = IdUtil.getUniqueId(info.getJobId(), info.getJobInstanceId(), taskId);
            this.taskStatusMap.put(uniqueId, TaskStatus.INIT);
            if (!this.workerProgressMap.containsKey(workerAddr)) {
                WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                this.workerProgressMap.put(workerAddr, workerProgressCounter);
            }
            this.workerProgressMap.get(workerAddr).incrementTotal();
            taskIdMap.put(workerIdAddr, taskId);
        }
        for (String workerIdAddr : this.allWorkers) {
            Worker.ContainerReportTaskStatusRequest faileRequest;
            workerInfo = workerIdAddr.split("@");
            workerAddr = workerInfo[1];
            String workerId = workerInfo[0];
            ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
            long taskId = (Long)taskIdMap.get(workerIdAddr);
            String uniqueId = IdUtil.getUniqueId(info.getJobId(), info.getJobInstanceId(), taskId);
            Worker.MasterStartContainerRequest.Builder builder = this.convert2StartContainerRequestBuilder(info, taskId);
            builder.setShardingNum(this.allWorkers.size());
            Worker.MasterStartContainerRequest request2 = builder.build();
            try {
                Worker.MasterStartContainerResponse response = (Worker.MasterStartContainerResponse)FutureUtils.awaitResult(selection, (Object)request2, 10L);
                if (response.getSuccess()) {
                    this.worker2uniqueIdMap.put(workerIdAddr, uniqueId);
                    this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[BroadcastTaskMaster-submitTask]broadcast task init success worker addr is ", workerAddr));
                    continue;
                }
                LOGGER.error("submitTask[{}] to worker error, {}", uniqueId, workerAddr, response.getMessage());
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[BroadcastTaskMaster-submitTask]broadcast task init fail worker addr is ", workerAddr, response.getMessage()));
                this.existInvalidWorker = true;
                faileRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(info.getJobId()).setJobInstanceId(info.getJobInstanceId()).setTaskId(taskId).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setWorkerAddr(workerAddr).setSerialNum(this.getSerialNum()).build();
                this.updateTaskStatus(faileRequest);
            }
            catch (Throwable e) {
                LOGGER.error("start container failed, worker:{}, uniqueId:{}", workerAddr, uniqueId, e);
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[BroadcastTaskMaster-submitTask]broadcast task init fail worker addr is ", workerAddr), e);
                this.existInvalidWorker = true;
                faileRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(info.getJobId()).setJobInstanceId(info.getJobInstanceId()).setTaskId(taskId).setStatus(TaskStatus.FAILED.getValue()).setWorkerAddr(workerAddr).setWorkerId(workerId).setSerialNum(this.getSerialNum()).build();
                this.updateTaskStatus(faileRequest);
            }
        }
        this.startMonitorThreads();
    }

    @Override
    public void killInstance(String reason) {
        super.killInstance(reason);
        String uniqueId = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
        this.updateNewInstanceStatus(this.getSerialNum(), this.jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, reason);
        for (String workerIdAddr : this.allWorkers) {
            try {
                ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
                Worker.MasterKillContainerRequest request2 = Worker.MasterKillContainerRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).build();
                selection.tell(request2, null);
            }
            catch (Throwable e) {
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[Master-killInstance]kill instance tell worker fail worker addr is ", workerIdAddr), e);
                LOGGER.error("send kill instance request exception, worker:{}, uninqueId:{}", workerIdAddr, uniqueId);
            }
        }
        this.taskStatusMap.clear();
    }

    @Override
    public void destroyContainerPool() {
        for (String workerIdAddr : this.allWorkers) {
            Worker.MasterDestroyContainerPoolRequest request2 = Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).setWorkerIdAddr(workerIdAddr).setSerialNum(this.getSerialNum()).build();
            SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(request2, null);
        }
    }

    @Override
    public synchronized void updateTaskStatus(Worker.ContainerReportTaskStatusRequest request2) {
        if (request2.getSerialNum() != this.getSerialNum()) {
            LOGGER.warn("ignore ContainerReportTaskStatusRequest, current serialNum={}, but request serialNum={}.", this.getSerialNum(), request2.getSerialNum());
            return;
        }
        long jobId = request2.getJobId();
        long jobInstanceId = request2.getJobInstanceId();
        long taskId = request2.getTaskId();
        String workerAddr = request2.getWorkerAddr();
        TaskStatus taskStatus = TaskStatus.parseValue(request2.getStatus());
        String uniqueId = IdUtil.getUniqueId(jobId, jobInstanceId, taskId);
        LOGGER.info("update task status serialNum={}, uniqueId={}, status={}, workerAddr={}", request2.getSerialNum(), uniqueId, taskStatus.getDescription(), workerAddr);
        if (this.taskStatusMap.containsKey(uniqueId)) {
            if (((TaskStatus)((Object)this.taskStatusMap.get(uniqueId))).equals((Object)taskStatus)) {
                LOGGER.warn("duplicated ContainerReportTaskStatusRequest, uniqueId={}, taskStatus={}", new Object[]{uniqueId, taskStatus});
            } else {
                if (taskStatus.isFinish()) {
                    this.taskStatusMap.remove(uniqueId);
                } else {
                    this.taskStatusMap.put(uniqueId, taskStatus);
                }
                if (!this.workerProgressMap.containsKey(workerAddr)) {
                    WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                    this.workerProgressMap.put(workerAddr, workerProgressCounter);
                }
                if (taskStatus.equals((Object)TaskStatus.RUNNING)) {
                    this.workerProgressMap.get(workerAddr).incrementRunning();
                } else if (taskStatus.equals((Object)TaskStatus.SUCCESS)) {
                    this.workerProgressMap.get(workerAddr).incrementSuccess();
                } else if (taskStatus.equals((Object)TaskStatus.FAILED)) {
                    this.workerProgressMap.get(workerAddr).incrementFailed();
                }
                this.taskIdResultMap.put(request2.getTaskId(), request2.getResult());
                this.taskIdStatusMap.put(request2.getTaskId(), taskStatus);
                this.updateNewInstanceStatus(request2.getSerialNum(), jobInstanceId, request2.getResult());
            }
        }
    }

    private synchronized void updateNewInstanceStatus(long serialNum, long jobInstanceId, String result2) {
        InstanceStatus newStatus;
        InstanceStatus instanceStatus = newStatus = this.killed ? InstanceStatus.FAILED : InstanceStatus.SUCCESS;
        if (this.taskStatusMap.size() > 0) {
            if (!this.isJobInstanceFinished()) {
                newStatus = InstanceStatus.RUNNING;
            } else {
                newStatus = InstanceStatus.SUCCESS;
                for (TaskStatus status : this.taskStatusMap.values()) {
                    if (!status.equals((Object)TaskStatus.FAILED)) continue;
                    newStatus = InstanceStatus.FAILED;
                    break;
                }
            }
        }
        LOGGER.info("update serialNum={}, jobInstanceId={} status={}", serialNum, jobInstanceId, newStatus.getDescription());
        this.updateNewInstanceStatus(serialNum, jobInstanceId, newStatus, result2);
    }

    @Override
    public String getJobInstanceProgress() {
        MapTaskProgress detail = new MapTaskProgress();
        detail.setWorkerProgress(this.workerProgressMap.values());
        return JsonUtil.toJson(detail);
    }

    private synchronized void startMonitorThreads() {
        this.monitor = true;
        if (this.running) {
            return;
        }
        final String jobIdAndInstanceId = this.jobInstanceInfo.getJobId() + "_" + this.jobInstanceInfo.getJobInstanceId();
        final BroadcastTaskMaster taskMaster = this;
        new Thread(new Runnable(){

            @Override
            public void run() {
                while (!BroadcastTaskMaster.this.instanceStatus.isFinish()) {
                    if (!BroadcastTaskMaster.this.monitor) continue;
                    BroadcastTaskMaster.this.aliveCheckWorkerSet.addAll(taskMaster.allWorkers);
                    for (String workerIdAddr : BroadcastTaskMaster.this.aliveCheckWorkerSet) {
                        try {
                            ActorSelection selection = BroadcastTaskMaster.this.getActorContext().actorSelection(ActorPathUtil.getWorkerHeartbeatRouterPath(workerIdAddr));
                            Worker.MasterCheckWorkerAliveRequest request2 = Worker.MasterCheckWorkerAliveRequest.newBuilder().setJobInstanceId(BroadcastTaskMaster.this.jobInstanceInfo.getJobInstanceId()).build();
                            FutureUtils.awaitResult(selection, (Object)request2, 10L);
                        }
                        catch (TimeoutException e) {
                            taskMaster.existInvalidWorker = true;
                            String uniqueId = (String)BroadcastTaskMaster.this.worker2uniqueIdMap.get(workerIdAddr);
                            if (uniqueId != null) {
                                String[] workerInfo = workerIdAddr.split("@");
                                String workerAddr = workerInfo[1];
                                String workerId = workerInfo[0];
                                String[] tokens = uniqueId.split("_");
                                Worker.ContainerReportTaskStatusRequest request3 = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(Long.valueOf(tokens[0])).setJobInstanceId(Long.valueOf(tokens[1])).setTaskId(Long.valueOf(tokens[2])).setStatus(TaskStatus.FAILED.getValue()).setWorkerAddr(workerAddr).setWorkerId(workerId).setSerialNum(taskMaster.getSerialNum()).build();
                                BroadcastTaskMaster.this.updateTaskStatus(request3);
                                LOGGER.warn("worker[{}] is down, set {} to failed", workerAddr, uniqueId);
                                continue;
                            }
                            LOGGER.error("can't found workerAddr of uniqueId={}", uniqueId);
                        }
                        catch (Throwable e) {
                            LOGGER.error("", e);
                        }
                    }
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e) {
                        LOGGER.error("", e);
                        break;
                    }
                }
            }
        }, "Schedulerx-BroadcastTaskMaster-check-worker-alive-thread-" + this.jobInstanceInfo.getJobId() + "_" + this.jobInstanceInfo.getJobInstanceId()).start();
        if (!JobUtil.isSecondTypeJob(TimeType.parseValue(this.jobInstanceInfo.getTimeType()))) {
            new Thread(new Runnable(){

                @Override
                public void run() {
                    while (!BroadcastTaskMaster.this.instanceStatus.isFinish()) {
                        Worker.WorkerReportJobInstanceProgressRequest request2 = Worker.WorkerReportJobInstanceProgressRequest.newBuilder().setJobId(BroadcastTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(BroadcastTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setProgress(BroadcastTaskMaster.this.getJobInstanceProgress()).build();
                        BroadcastTaskMaster.this.SERVER_DISCOVERY.getMapMasterRouter().tell(request2, null);
                        try {
                            Thread.sleep(5000L);
                        }
                        catch (InterruptedException e) {
                            LOGGER.error("report status error, uniqueId={}", jobIdAndInstanceId, e);
                            break;
                        }
                    }
                }
            }, "Schedulerx-BroadcastTaskMaster-report-progress-thread-" + jobIdAndInstanceId).start();
        }
        new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!BroadcastTaskMaster.this.instanceStatus.isFinish()) {
                    try {
                        Thread.sleep(5000L);
                        BroadcastTaskMaster broadcastTaskMaster = taskMaster;
                        synchronized (broadcastTaskMaster) {
                            if (!BroadcastTaskMaster.this.monitor) {
                                continue;
                            }
                            if (BroadcastTaskMaster.this.taskStatusMap.size() < 10) {
                                LOGGER.info("taskStatusMap=" + BroadcastTaskMaster.this.taskStatusMap);
                            }
                            BroadcastTaskMaster.this.updateNewInstanceStatus(BroadcastTaskMaster.this.getSerialNum(), BroadcastTaskMaster.this.jobInstanceInfo.getJobInstanceId(), "");
                        }
                    }
                    catch (Throwable e) {
                        LOGGER.error("status check error, uniqueId:{}", jobIdAndInstanceId, e);
                    }
                }
            }
        }, "Schedulerx-BroadcastTaskMaster-status-check-thread-" + jobIdAndInstanceId).start();
        this.running = true;
    }

    public Map<String, WorkerProgressCounter> getWorkerProgressMap() {
        return this.workerProgressMap;
    }

    @Override
    protected void checkProcessor() throws Exception {
        JobProcessor processor;
        if ("java".equalsIgnoreCase(this.jobInstanceInfo.getJobType()) && (processor = JavaProcessorProfileUtil.getJavaProcessor(this.jobInstanceInfo.getContent())) instanceof MapJobProcessor) {
            throw new IOException(processor.getClass().getName() + " shouldn't extends MapJobProcessor or MapReduceJobProcessor");
        }
    }

    @Override
    public ProcessResult postFinish(long jobInstanceId) {
        ProcessResult postResult = null;
        if ("java".equalsIgnoreCase(this.jobInstanceInfo.getJobType())) {
            try {
                JobContext context = JobContext.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceId).setJobType(this.jobInstanceInfo.getJobType()).setContent(this.jobInstanceInfo.getContent()).setScheduleTime(this.jobInstanceInfo.getScheduleTime()).setDataTime(this.jobInstanceInfo.getDataTime()).setJobParameters(this.jobInstanceInfo.getParameters()).setInstanceParameters(this.jobInstanceInfo.getInstanceParameters()).setUser(this.jobInstanceInfo.getUser()).setTaskResults(this.taskIdResultMap).setTaskStatuses(this.taskIdStatusMap).build();
                JobProcessor jobProcessor = JavaProcessorProfileUtil.getJavaProcessor(context.getContent());
                postResult = jobProcessor.postProcess(context);
            }
            catch (Throwable e) {
                LOGGER.error("", e);
            }
        }
        return postResult;
    }

    private void preProcess(JobInstanceInfo jobInstanceInfo) throws Exception {
        JobContext context = JobContext.newBuilder().setJobId(jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceInfo.getJobInstanceId()).setJobType(jobInstanceInfo.getJobType()).setContent(jobInstanceInfo.getContent()).setScheduleTime(jobInstanceInfo.getScheduleTime()).setDataTime(jobInstanceInfo.getDataTime()).setJobParameters(jobInstanceInfo.getParameters()).setInstanceParameters(jobInstanceInfo.getInstanceParameters()).setUser(jobInstanceInfo.getUser()).build();
        JobProcessor jobProcessor = JavaProcessorProfileUtil.getJavaProcessor(context.getContent());
        jobProcessor.preProcess(context);
    }

    @Override
    public void clear() {
        super.clear();
        this.worker2uniqueIdMap.clear();
        this.workerProgressMap.clear();
        this.monitor = false;
        if (this.taskIdResultMap != null) {
            this.taskIdResultMap.clear();
        }
        if (this.taskIdStatusMap != null) {
            this.taskIdStatusMap.clear();
        }
    }
}

