/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.job.task.support.dispatch;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.pekko.ActorGenerator;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowPartitionTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component(value="ScanWorkflowTaskActor")
@Scope(value="prototype")
public class ScanWorkflowTaskActor
extends AbstractActor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ScanWorkflowTaskActor.class);
    private final WorkflowMapper workflowMapper;
    private final SystemProperties systemProperties;
    private final GroupConfigMapper groupConfigMapper;

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ScanTask.class, config -> {
            try {
                this.doScan((ScanTask)config);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("Data scanner processing exception. [{}]", new Object[]{config, e});
            }
        }).build();
    }

    private void doScan(ScanTask scanTask) {
        PartitionTaskUtils.process(startId -> this.listAvailableWorkflows(startId, scanTask), this::processPartitionTasks, (long)0L);
    }

    private void processPartitionTasks(List<? extends PartitionTask> partitionTasks) {
        ArrayList<Workflow> waitUpdateJobs = new ArrayList<Workflow>();
        ArrayList<WorkflowTaskPrepareDTO> waitExecWorkflows = new ArrayList<WorkflowTaskPrepareDTO>();
        long now = DateUtils.toNowMilli();
        for (PartitionTask partitionTask : partitionTasks) {
            WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO)partitionTask;
            this.processWorkflow(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now);
        }
        this.workflowMapper.updateBatchNextTriggerAtById(waitUpdateJobs);
        for (WorkflowTaskPrepareDTO workflowTaskPrepareDTO : waitExecWorkflows) {
            ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
            workflowTaskPrepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
            actorRef.tell((Object)workflowTaskPrepareDTO, actorRef);
        }
    }

    private void processWorkflow(WorkflowPartitionTaskDTO partitionTask, List<Workflow> waitUpdateWorkflows, List<WorkflowTaskPrepareDTO> waitExecJobs, long now) {
        CacheConsumerGroup.addOrUpdate((String)partitionTask.getGroupName(), (String)partitionTask.getNamespaceId());
        Long nextTriggerAt = this.calculateNextTriggerTime(partitionTask, now);
        Workflow workflow = new Workflow();
        workflow.setId(partitionTask.getId());
        workflow.setNextTriggerAt(nextTriggerAt);
        waitUpdateWorkflows.add(workflow);
        waitExecJobs.add(WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(partitionTask));
    }

    private Long calculateNextTriggerTime(WorkflowPartitionTaskDTO partitionTask, long now) {
        long nextTriggerAt = partitionTask.getNextTriggerAt();
        if (nextTriggerAt + DateUtils.toEpochMilli((long)SystemConstants.SCHEDULE_PERIOD) < now) {
            long randomMs = (long)(RandomUtil.randomDouble((double)0.0, (double)4.0, (int)2, (RoundingMode)RoundingMode.UP) * 1000.0);
            nextTriggerAt = now + randomMs;
            partitionTask.setNextTriggerAt(nextTriggerAt);
        }
        WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy((int)partitionTask.getTriggerType());
        WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
        waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
        waitStrategyContext.setNextTriggerAt(nextTriggerAt);
        return waitStrategy.computeTriggerTime(waitStrategyContext);
    }

    private List<WorkflowPartitionTaskDTO> listAvailableWorkflows(Long startId, ScanTask scanTask) {
        if (CollUtil.isEmpty((Collection)scanTask.getBuckets())) {
            return Collections.emptyList();
        }
        List workflows = ((PageDTO)this.workflowMapper.selectPage((IPage)new PageDTO(0L, (long)this.systemProperties.getJobPullPageSize()), (Wrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{Workflow::getId, Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType, Workflow::getTriggerInterval, Workflow::getExecutorTimeout, Workflow::getNamespaceId, Workflow::getFlowInfo, Workflow::getBlockStrategy, Workflow::getWfContext}).eq(Workflow::getWorkflowStatus, (Object)StatusEnum.YES.getStatus())).eq(Workflow::getDeleted, (Object)StatusEnum.NO.getStatus())).in(Workflow::getBucketIndex, (Collection)scanTask.getBuckets())).le(Workflow::getNextTriggerAt, (Object)(DateUtils.toNowMilli() + DateUtils.toEpochMilli((long)SystemConstants.SCHEDULE_PERIOD)))).ge(Workflow::getId, (Object)startId)).orderByAsc(Workflow::getId))).getRecords();
        if (CollUtil.isNotEmpty((Collection)workflows)) {
            List groupConfigs = StreamUtils.toList((Collection)this.groupConfigMapper.selectList((Wrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{GroupConfig::getGroupName}).eq(GroupConfig::getGroupStatus, (Object)StatusEnum.YES.getStatus())).in(GroupConfig::getGroupName, (Collection)StreamUtils.toSet((Collection)workflows, Workflow::getGroupName))), GroupConfig::getGroupName);
            workflows = workflows.stream().filter(workflow -> groupConfigs.contains(workflow.getGroupName())).collect(Collectors.toList());
        }
        return WorkflowTaskConverter.INSTANCE.toWorkflowPartitionTaskList(workflows);
    }

    @Generated
    public ScanWorkflowTaskActor(WorkflowMapper workflowMapper, SystemProperties systemProperties, GroupConfigMapper groupConfigMapper) {
        this.workflowMapper = workflowMapper;
        this.systemProperties = systemProperties;
        this.groupConfigMapper = groupConfigMapper;
    }
}

