/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.master;

import akka.actor.ActorContext;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.domain.MapTaskXAttrs;
import com.alibaba.schedulerx.common.domain.Metrics;
import com.alibaba.schedulerx.common.domain.TaskDispatchMode;
import com.alibaba.schedulerx.common.monitor.MetricsCollector;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.shade.com.google.protobuf.ByteString;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.batch.TMStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.TaskPushReqHandler;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.master.MapTaskMaster;
import com.alibaba.schedulerx.worker.master.persistence.H2MemoryPersistence;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.pull.TaskPullReqHandler;
import java.io.IOException;
import java.util.List;

public class GridTaskMaster
extends MapTaskMaster {
    private static final Logger LOGGER = LogFactory.getLogger(GridTaskMaster.class);

    public GridTaskMaster(JobInstanceInfo jobInstanceInfo, ActorContext actorContext) throws Exception {
        super(jobInstanceInfo, actorContext);
        this.taskPersistence = H2MemoryPersistence.getInstance();
        this.taskPersistence.initTable();
        long jobInstanceId = jobInstanceInfo.getJobInstanceId();
        this.taskStatusReqQueue = new ReqQueue(jobInstanceId, 100000);
        this.taskStatusReqBatchHandler = new TMStatusReqHandler(jobInstanceId, 1, 1, 3000, this.taskStatusReqQueue);
        if (jobInstanceInfo.getXattrs() != null) {
            this.xAttrs = JsonUtil.fromJson(jobInstanceInfo.getXattrs(), MapTaskXAttrs.class);
        }
        if (this.xAttrs != null && this.xAttrs.getTaskDispatchMode().equals(TaskDispatchMode.PULL.getValue())) {
            this.taskBlockingQueue = new ReqQueue(jobInstanceId, 100000);
            this.taskDispatchReqHandler = new TaskPullReqHandler(jobInstanceId, 1, 1, this.pageSize * jobInstanceInfo.getAllWorkers().size(), this.taskBlockingQueue);
        } else {
            Long dispatchDelay;
            int batchSize = this.pageSize * jobInstanceInfo.getAllWorkers().size();
            if (this.isWorkerLoadRouter()) {
                batchSize = 2 * jobInstanceInfo.getAllWorkers().size();
            }
            if ((dispatchDelay = this.parseDispatchSpeed()) != null) {
                batchSize = 1;
                this.taskBlockingQueue = new ReqQueue(jobInstanceId, Integer.MAX_VALUE);
            } else {
                this.taskBlockingQueue = new ReqQueue(jobInstanceId, 100000);
            }
            this.taskDispatchReqHandler = new TaskPushReqHandler(jobInstanceId, 1, 1, batchSize, this.taskBlockingQueue, 3000, dispatchDelay);
        }
    }

    @Override
    public boolean map(List<ByteString> taskList, String taskName) throws Exception {
        if (CollectionUtils.isEmpty(taskList)) {
            LOGGER.warn("map taskList is empty, taskName:{}", taskName);
            return false;
        }
        LOGGER.info("map taskList, jobInstanceId={}, taskName:{}, taskList size:{}", this.jobInstanceInfo.getJobInstanceId(), taskName, taskList.size());
        int counter = this.taskCounter.addAndGet(taskList.size());
        if (this.xAttrs.getTaskDispatchMode().equals(TaskDispatchMode.PULL.getValue()) && counter > 10000) {
            LOGGER.error("jobInstanceId={}, pullModel, task counter={}, beyond {} !", this.jobInstanceInfo.getJobInstanceId(), counter, 10000);
            throw new IOException("task size of pullModel can't beyond 10000");
        }
        this.doMetricsCheck();
        return super.map(taskList, taskName);
    }

    protected void doMetricsCheck() throws IOException {
        double usedMemoryPercent;
        Metrics vmDetail = MetricsCollector.getMetrics();
        if (vmDetail != null && (usedMemoryPercent = vmDetail.getHeap5Usage()) > (double)0.9f) {
            throw new IOException("jvm memory usage:" + usedMemoryPercent * 100.0 + ",beyond " + 90.0f + "%!");
        }
    }

    @Override
    public ProcessResult postFinish(long jobInstanceId) {
        ProcessResult postResult = super.postFinish(jobInstanceId);
        try {
            this.taskPersistence.clearTasks(jobInstanceId);
        }
        catch (Throwable e) {
            LOGGER.error("", e);
        }
        return postResult;
    }
}

