/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.server.core;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.common.Holder;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.instance.InstanceMetadataService;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.remote.transporter.TransportService;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import tech.powerjob.server.remote.worker.selector.TaskTrackerSelectorService;

@Service
public class DispatchService {
    private static final Logger log = LoggerFactory.getLogger(DispatchService.class);
    private final TransportService transportService;
    private final WorkerClusterQueryService workerClusterQueryService;
    private final InstanceManager instanceManager;
    private final InstanceMetadataService instanceMetadataService;
    private final InstanceInfoRepository instanceInfoRepository;
    private final TaskTrackerSelectorService taskTrackerSelectorService;

    @UseCacheLock(type="processJobInstance", key="#instanceId", concurrencyLevel=1024)
    public void redispatchAsync(Long instanceId, int originStatus) {
        this.instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(instanceId.longValue(), originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
    }

    public void redispatchBatchAsyncLockFree(List<Long> instanceIdList, int originStatus) {
        this.instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdListAndOriginStatus(instanceIdList, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
    }

    @UseCacheLock(type="processJobInstance", key="#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel=1024)
    public void dispatch(JobInfoDO jobInfo, Long instanceId, Optional<InstanceInfoDO> instanceInfoOptional, Optional<Holder<Boolean>> overloadOptional) {
        long runningInstanceCount;
        InstanceInfoDO instanceInfo = instanceInfoOptional.orElseGet(() -> this.instanceInfoRepository.findByInstanceId(instanceId.longValue()));
        Long jobId = instanceInfo.getJobId();
        if (InstanceStatus.CANCELED.getV() == instanceInfo.getStatus().intValue()) {
            log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been canceled", (Object)jobId, (Object)instanceId);
            return;
        }
        if (instanceInfo.getStatus().intValue() != InstanceStatus.WAITING_DISPATCH.getV()) {
            log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been dispatched", (Object)jobId, (Object)instanceId);
            return;
        }
        if (jobInfo.getId() == null) {
            log.warn("[Dispatcher-{}|{}] cancel dispatch due to job(id={}) has been deleted!", new Object[]{jobId, instanceId, jobId});
            this.instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), InstanceStatus.FAILED, "can't find job by id " + jobId);
            return;
        }
        Date now = new Date();
        String dbInstanceParams = instanceInfo.getInstanceParams() == null ? "" : instanceInfo.getInstanceParams();
        log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", new Object[]{jobId, instanceId, jobInfo, dbInstanceParams});
        long current = System.currentTimeMillis();
        Integer maxInstanceNum = jobInfo.getMaxInstanceNum();
        if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) {
            maxInstanceNum = 1;
        }
        if (maxInstanceNum > 0 && (runningInstanceCount = this.instanceInfoRepository.countByJobIdAndStatusIn(jobId.longValue(), (List)Lists.newArrayList((Object[])new Integer[]{InstanceStatus.WAITING_WORKER_RECEIVE.getV(), InstanceStatus.RUNNING.getV()}))) >= (long)maxInstanceNum.intValue()) {
            String result = String.format("too many instances(%d>%d)", runningInstanceCount, maxInstanceNum);
            log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", new Object[]{jobId, instanceId, runningInstanceCount, maxInstanceNum});
            this.instanceInfoRepository.update4TriggerFailed(instanceId.longValue(), InstanceStatus.FAILED.getV(), current, current, "N/A", result, now);
            this.instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), InstanceStatus.FAILED, result);
            return;
        }
        List<WorkerInfo> suitableWorkers = this.workerClusterQueryService.geAvailableWorkers(jobInfo);
        if (CollectionUtils.isEmpty((Collection)suitableWorkers)) {
            log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", (Object)jobId, (Object)instanceId);
            this.instanceInfoRepository.update4TriggerFailed(instanceId.longValue(), InstanceStatus.FAILED.getV(), current, current, "N/A", "no worker available", now);
            this.instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), InstanceStatus.FAILED, "no worker available");
            return;
        }
        if ((suitableWorkers = this.filterOverloadWorker(suitableWorkers)).isEmpty()) {
            overloadOptional.ifPresent(booleanHolder -> booleanHolder.set((Object)true));
            log.warn("[Dispatcher-{}|{}] cancel to dispatch job due to all worker is overload", (Object)jobId, (Object)instanceId);
            return;
        }
        List<String> workerIpList = suitableWorkers.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
        ServerScheduleJobReq req = this.constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);
        WorkerInfo taskTracker = this.taskTrackerSelectorService.select(jobInfo, instanceInfo, suitableWorkers);
        String taskTrackerAddress = taskTracker.getAddress();
        URL workerUrl = ServerURLFactory.dispatchJob2Worker((String)taskTrackerAddress);
        this.transportService.tell(taskTracker.getProtocol(), workerUrl, (PowerSerializable)req);
        log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", new Object[]{jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req});
        this.instanceInfoRepository.update4TriggerSucceed(instanceId.longValue(), InstanceStatus.WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now, instanceInfo.getStatus().intValue());
        this.instanceMetadataService.loadJobInfo(instanceId, jobInfo);
    }

    private List<WorkerInfo> filterOverloadWorker(List<WorkerInfo> suitableWorkers) {
        ArrayList<WorkerInfo> res = new ArrayList<WorkerInfo>(suitableWorkers.size());
        for (WorkerInfo suitableWorker : suitableWorkers) {
            if (suitableWorker.overload()) continue;
            res.add(suitableWorker);
        }
        return res;
    }

    private ServerScheduleJobReq constructServerScheduleJobReq(JobInfoDO jobInfo, InstanceInfoDO instanceInfo, List<String> finalWorkersIpList) {
        ServerScheduleJobReq req = new ServerScheduleJobReq();
        BeanUtils.copyProperties((Object)jobInfo, (Object)req);
        req.setJobId(jobInfo.getId());
        if (StringUtils.isEmpty((CharSequence)instanceInfo.getInstanceParams())) {
            req.setInstanceParams(null);
        } else {
            req.setInstanceParams(instanceInfo.getInstanceParams());
        }
        if (!StringUtils.isEmpty((CharSequence)instanceInfo.getJobParams())) {
            req.setJobParams(instanceInfo.getJobParams());
        }
        req.setInstanceId(instanceInfo.getInstanceId());
        req.setAllWorkerAddress(finalWorkersIpList);
        req.setMaxWorkerCount(jobInfo.getMaxWorkerCount());
        req.setWfInstanceId(instanceInfo.getWfInstanceId());
        req.setExecuteType(ExecuteType.of((int)jobInfo.getExecuteType()).name());
        req.setProcessorType(ProcessorType.of((int)jobInfo.getProcessorType()).name());
        req.setTimeExpressionType(TimeExpressionType.of((int)jobInfo.getTimeExpressionType()).name());
        if (jobInfo.getInstanceTimeLimit() != null) {
            req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit().longValue());
        }
        req.setThreadConcurrency(jobInfo.getConcurrency().intValue());
        return req;
    }

    public DispatchService(TransportService transportService, WorkerClusterQueryService workerClusterQueryService, InstanceManager instanceManager, InstanceMetadataService instanceMetadataService, InstanceInfoRepository instanceInfoRepository, TaskTrackerSelectorService taskTrackerSelectorService) {
        this.transportService = transportService;
        this.workerClusterQueryService = workerClusterQueryService;
        this.instanceManager = instanceManager;
        this.instanceMetadataService = instanceMetadataService;
        this.instanceInfoRepository = instanceInfoRepository;
        this.taskTrackerSelectorService = taskTrackerSelectorService;
    }
}

