/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.processor;

import akka.actor.ActorSelection;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.HessianUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.protobuf.ByteString;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.discovery.GroupManager;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.MapTaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;
import com.alibaba.schedulerx.worker.processor.BizSubTask;
import com.alibaba.schedulerx.worker.processor.JavaProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public abstract class MapJobProcessor
extends JavaProcessor {
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(MapJobProcessor.class);
    private static final Integer MAX_RETRY_COUNT = 3;

    public ProcessResult map(List<? extends Object> taskList, String taskName) {
        ProcessResult result2 = new ProcessResult(false);
        JobContext context = ContainerFactory.getContainerPool().getContext();
        ActorSelection masterAkkaSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
        if (masterAkkaSelection == null) {
            String errMsg = "get taskMaster akka path error, path=" + context.getInstanceMasterActorPath();
            LOGGER.error(errMsg);
            result2.setResult(errMsg);
            return result2;
        }
        if (CollectionUtils.isEmpty(taskList)) {
            result2.setResult("task list is empty");
            return result2;
        }
        int batchSize = ConfigUtil.getWorkerConfig().getInt("worker.map.page.size", 1000);
        int size2 = taskList.size();
        LOGGER.info("map task list, jobInstanceId={}, taskName={}, size={}, batchSize={}", context.getJobInstanceId(), taskName, size2, batchSize);
        int quotient = size2 / batchSize;
        int remainder = size2 % batchSize;
        int batchNumber = remainder > 0 ? quotient + 1 : quotient;
        ArrayList<Worker.WorkerMapTaskRequest.Builder> builders = Lists.newArrayList();
        for (int i = 0; i < batchNumber; ++i) {
            builders.add(Worker.WorkerMapTaskRequest.newBuilder());
        }
        int position = 0;
        int maxTaskBodySize = ConfigUtil.getWorkerConfig().getInt("task.body.size.max", 65536);
        try {
            for (Object object : taskList) {
                this.checkTaskObject(object);
                int batchIdx = position++ / batchSize;
                byte[] taskBody = HessianUtil.toBytes(object);
                if (taskBody.length > maxTaskBodySize) {
                    throw new IOException("taskBody size more than " + maxTaskBodySize + "B!");
                }
                ((Worker.WorkerMapTaskRequest.Builder)builders.get(batchIdx)).addTaskBody(ByteString.copyFrom(taskBody));
            }
            position = 0;
            for (Worker.WorkerMapTaskRequest.Builder builder : builders) {
                builder.setJobId(context.getJobId());
                builder.setJobInstanceId(context.getJobInstanceId());
                builder.setTaskId(context.getTaskId());
                builder.setTaskName(taskName);
                Worker.WorkerMapTaskResponse response = null;
                int retryCount = 0;
                try {
                    TaskMaster taskMaster = TaskMasterPool.INSTANCE.get(context.getJobInstanceId());
                    response = taskMaster != null && taskMaster instanceof MapTaskMaster ? this.handleMapTask(taskMaster, builder.build()) : (Worker.WorkerMapTaskResponse)FutureUtils.awaitResult(masterAkkaSelection, (Object)builder.build(), 30L);
                }
                catch (TimeoutException exception) {
                    LOGGER.warn("JobInstanceId={} WorkerMapTaskRequest dispatch timeout.", context.getJobInstanceId(), exception);
                    if (retryCount < MAX_RETRY_COUNT) {
                        Thread.sleep(10000L);
                        masterAkkaSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
                        response = (Worker.WorkerMapTaskResponse)FutureUtils.awaitResult(masterAkkaSelection, (Object)builder.build(), 30L);
                        ++retryCount;
                    }
                    throw exception;
                }
                if (!response.getSuccess()) {
                    LOGGER.error(response.getMessage());
                    this.logCollector.collect(context.getAppGroupId(), context.getUniqueId(), response.getMessage());
                    result2.setResult(response.getMessage());
                    return result2;
                }
                builders.set(position++, null);
                if (!response.hasOverload() || !response.getOverload()) continue;
                LOGGER.warn("Task Master is busy, sleeping a while {}s...", 10);
                Thread.sleep(10000L);
            }
            result2.setStatus(true);
        }
        catch (Throwable e) {
            LOGGER.error("JobInstanceId={} WorkerMapTaskRequest dispatch error.", context.getJobInstanceId(), e);
            this.logCollector.collect(context.getAppGroupId(), context.getUniqueId(), ExceptionUtil.getTrace(e));
            result2.setResult(ExceptionUtil.getMessage(e));
        }
        return result2;
    }

    private void checkTaskObject(Object taskObject) {
        JobContext context = ContainerFactory.getContainerPool().getContext();
        boolean isAdvancedVersion = GroupManager.INSTANCE.isAdvancedVersion(context.getGroupId());
        if (isAdvancedVersion && taskObject instanceof BizSubTask) {
            BizSubTask bizSubTask = (BizSubTask)taskObject;
            Map<String, String> labelMap = bizSubTask.labelMap();
            if (labelMap.size() > 3) {
                throw new RuntimeException("label map size can't beyond 3.");
            }
            for (Map.Entry<String, String> entry : labelMap.entrySet()) {
                if (entry.getKey().length() <= 60 && entry.getValue().length() <= 180) continue;
                LOGGER.error("Job instance={} label map<{}, {}> content can't beyond max size(60,180).", context.getJobInstanceId(), entry.getKey().length(), entry.getValue());
                throw new RuntimeException("label map content can't beyond max size(60,180).");
            }
        }
    }

    private Worker.WorkerMapTaskResponse handleMapTask(TaskMaster taskMaster, Worker.WorkerMapTaskRequest request) throws Exception {
        Worker.WorkerMapTaskResponse response;
        block6: {
            response = null;
            try {
                long jobInstanceId = request.getJobInstanceId();
                if (taskMaster != null) {
                    if (!(taskMaster instanceof MapTaskMaster)) {
                        response = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is not MapTaskMaster").build();
                        taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, "TaskMaster is not MapTaskMaster");
                        break block6;
                    }
                    try {
                        long startTime = System.currentTimeMillis();
                        boolean overload = ((MapTaskMaster)taskMaster).map(request.getTaskBodyList(), request.getTaskName());
                        LOGGER.debug("jobInstanceId={} map, cost={}ms", jobInstanceId, System.currentTimeMillis() - startTime);
                        response = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(true).setOverload(overload).build();
                        break block6;
                    }
                    catch (Exception e) {
                        LOGGER.error("jobInstanceId={} map error", e);
                        taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(e));
                        throw e;
                    }
                }
                response = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("can't found TaskMaster by jobInstanceId=" + jobInstanceId).build();
            }
            catch (Throwable e) {
                LOGGER.error("jobInstanceId={}, handleMapTask error.", request.getJobInstanceId(), e);
                response = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
            }
        }
        return response;
    }

    protected boolean isRootTask(JobContext context) {
        return context.getTaskName().equals("MAP_TASK_ROOT");
    }
}

