/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.job;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.kyligence.kap.guava20.shaded.common.annotations.VisibleForTesting;
import io.kyligence.kap.secondstorage.SecondStorageUtil;
import io.kyligence.kap.secondstorage.enums.LockTypeEnum;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.exception.ErrorCodeSupplier;
import org.apache.kylin.common.exception.JobErrorCode;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.job.JobStepType;
import org.apache.kylin.engine.spark.job.NResourceDetectStep;
import org.apache.kylin.engine.spark.job.NSparkCubingStep;
import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
import org.apache.kylin.engine.spark.job.SparkCleanupTransactionalTableStep;
import org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultExecutable;
import org.apache.kylin.job.execution.DefaultExecutableOnModel;
import org.apache.kylin.job.execution.ExecutableParams;
import org.apache.kylin.job.execution.JobSchedulerModeEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.factory.JobFactory;
import org.apache.kylin.metadata.cube.model.IndexPlan;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
import org.apache.kylin.metadata.job.JobBucket;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NSparkCubingJob
extends DefaultExecutableOnModel {
    private static final Logger logger = LoggerFactory.getLogger(NSparkCubingJob.class);

    public NSparkCubingJob() {
    }

    public NSparkCubingJob(Object notSetId) {
        super(notSetId);
    }

    @VisibleForTesting
    public static NSparkCubingJob create(Set<NDataSegment> segments, Set<LayoutEntity> layouts, String submitter, Set<JobBucket> buckets) {
        return NSparkCubingJob.create(segments, layouts, submitter, JobTypeEnum.INDEX_BUILD, RandomUtil.randomUUIDStr(), null, null, buckets);
    }

    @VisibleForTesting
    public static NSparkCubingJob createIncBuildJob(Set<NDataSegment> segments, Set<LayoutEntity> layouts, String submitter, Set<JobBucket> buckets) {
        return NSparkCubingJob.create(segments, layouts, submitter, JobTypeEnum.INC_BUILD, RandomUtil.randomUUIDStr(), null, null, buckets);
    }

    @VisibleForTesting
    public static NSparkCubingJob create(Set<NDataSegment> segments, Set<LayoutEntity> layouts, String submitter, JobTypeEnum jobType, String jobId, Set<String> ignoredSnapshotTables, Set<Long> partitions, Set<JobBucket> buckets) {
        JobFactory.JobBuildParams params = new JobFactory.JobBuildParams(segments, layouts, submitter, jobType, jobId, null, ignoredSnapshotTables, partitions, buckets, (Map)Maps.newHashMap());
        return NSparkCubingJob.innerCreate(params);
    }

    public static NSparkCubingJob create(JobFactory.JobBuildParams jobBuildParams) {
        NSparkCubingJob sparkCubingJob = NSparkCubingJob.innerCreate(jobBuildParams);
        if (CollectionUtils.isNotEmpty((Collection)jobBuildParams.getToBeDeletedLayouts())) {
            sparkCubingJob.setParam("toBeDeletedLayoutIds", NSparkCubingUtil.ids2Str((Set)NSparkCubingUtil.toLayoutIds((Set)jobBuildParams.getToBeDeletedLayouts())));
        }
        return sparkCubingJob;
    }

    private static NSparkCubingJob innerCreate(JobFactory.JobBuildParams params) {
        Set segments = params.getSegments();
        Set layouts = params.getLayouts();
        String submitter = params.getSubmitter();
        JobTypeEnum jobType = params.getJobType();
        String jobId = params.getJobId();
        Set ignoredSnapshotTables = params.getIgnoredSnapshotTables();
        Set partitions = params.getPartitions();
        Set buckets = params.getBuckets();
        Map extParams = params.getExtParams();
        Preconditions.checkArgument((!segments.isEmpty() ? 1 : 0) != 0);
        Preconditions.checkArgument((submitter != null ? 1 : 0) != 0);
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        if (!kylinConfig.isUTEnv()) {
            Preconditions.checkArgument((!layouts.isEmpty() ? 1 : 0) != 0);
        }
        NDataflow df = ((NDataSegment)segments.iterator().next()).getDataflow();
        NSparkCubingJob job = new NSparkCubingJob();
        long startTime = 0x7FFFFFFFFFFFFFFEL;
        long endTime = 0L;
        for (NDataSegment segment : segments) {
            startTime = Math.min(startTime, Long.parseLong(segment.getSegRange().getStart().toString()));
            endTime = endTime > Long.parseLong(segment.getSegRange().getStart().toString()) ? endTime : Long.parseLong(segment.getSegRange().getEnd().toString());
        }
        job.setParams(extParams);
        job.setId(jobId);
        job.setName(jobType.toString());
        job.setJobType(jobType);
        job.setTargetSubject(((NDataSegment)segments.iterator().next()).getModel().getUuid());
        job.setTargetSegments(segments.stream().map(x -> String.valueOf(x.getId())).collect(Collectors.toList()));
        job.setProject(df.getProject());
        job.setSubmitter(submitter);
        if (CollectionUtils.isNotEmpty((Collection)partitions)) {
            job.setTargetPartitions(partitions);
            job.setParam("partitionIds", job.getTargetPartitions().stream().map(String::valueOf).collect(Collectors.joining(",")));
            NSparkCubingJob.checkIfNeedBuildSnapshots(job);
        }
        if (CollectionUtils.isNotEmpty((Collection)buckets)) {
            job.setParam("buckets", ExecutableParams.toBucketParam((Set)buckets));
        }
        NSparkCubingJob.enableCostBasedPlannerIfNeed(df, segments, job);
        job.setParam("jobId", jobId);
        job.setParam("project", df.getProject());
        job.setParam("targetModel", job.getTargetSubject());
        job.setParam("dataflowId", df.getId());
        job.setParam("layoutIds", NSparkCubingUtil.ids2Str((Set)NSparkCubingUtil.toLayoutIds((Set)layouts)));
        job.setParam("segmentIds", String.join((CharSequence)",", job.getTargetSegments()));
        job.setParam("dataRangeStart", String.valueOf(startTime));
        job.setParam("dataRangeEnd", String.valueOf(endTime));
        if (CollectionUtils.isNotEmpty((Collection)ignoredSnapshotTables)) {
            job.setParam("ignoredSnapshotTables", String.join((CharSequence)",", ignoredSnapshotTables));
        }
        KylinConfigExt config = df.getConfig();
        AbstractExecutable resourceDetect = JobStepType.RESOURCE_DETECT.createStep((DefaultExecutable)job, (KylinConfig)config);
        AbstractExecutable cubing = JobStepType.CUBING.createStep((DefaultExecutable)job, (KylinConfig)config);
        AbstractExecutable updateMetadata = JobStepType.UPDATE_METADATA.createStep((DefaultExecutable)job, (KylinConfig)config);
        AbstractExecutable secondStorageDeleteIndex = NSparkCubingJob.initSecondStorageDeleteIndex(params.getToBeDeletedLayouts(), jobType, df, job, config);
        AbstractExecutable secondStorage = NSparkCubingJob.initSecondStorage(layouts, jobType, df, job, config);
        AbstractExecutable cleanUpTransactionalTable = NSparkCubingJob.initCleanUpTransactionalTable(kylinConfig, df, job, config);
        if (SecondStorageUtil.isModelEnable((String)df.getProject(), (String)job.getTargetSubject())) {
            NSparkCubingJob.setDAGRelations((AbstractExecutable)job, (KylinConfig)config, new NSparkCubingJobStep(resourceDetect, cubing, updateMetadata, secondStorageDeleteIndex, secondStorage, cleanUpTransactionalTable));
        }
        return job;
    }

    private static AbstractExecutable initSecondStorageDeleteIndex(Set<LayoutEntity> toBeDeletedLayouts, JobTypeEnum jobType, NDataflow df, NSparkCubingJob job, KylinConfigExt config) {
        if (!SecondStorageUtil.isModelEnable((String)df.getProject(), (String)job.getTargetSubject())) {
            return null;
        }
        AbstractExecutable secondStorage = null;
        if (Objects.equals(jobType, JobTypeEnum.INDEX_BUILD) && CollectionUtils.isNotEmpty(toBeDeletedLayouts)) {
            secondStorage = JobStepType.SECOND_STORAGE_INDEX_CLEAN.createStep((DefaultExecutable)job, (KylinConfig)config);
        }
        return secondStorage;
    }

    private static AbstractExecutable initSecondStorage(Set<LayoutEntity> layouts, JobTypeEnum jobType, NDataflow df, NSparkCubingJob job, KylinConfigExt config) {
        AbstractExecutable secondStorage = null;
        if (SecondStorageUtil.isModelEnable((String)df.getProject(), (String)job.getTargetSubject())) {
            if (Objects.equals(jobType, JobTypeEnum.INDEX_REFRESH)) {
                SecondStorageUtil.validateProjectLock((String)df.getProject(), Collections.singletonList(LockTypeEnum.LOAD.name()));
            }
            boolean hasBaseIndex = layouts.stream().anyMatch(SecondStorageUtil::isBaseTableIndex);
            if (Objects.equals(jobType, JobTypeEnum.INDEX_BUILD) || Objects.equals(jobType, JobTypeEnum.INC_BUILD)) {
                if (hasBaseIndex) {
                    secondStorage = JobStepType.SECOND_STORAGE_EXPORT.createStep((DefaultExecutable)job, (KylinConfig)config);
                }
            } else if (Objects.equals(jobType, JobTypeEnum.INDEX_REFRESH) && hasBaseIndex) {
                List oldSegs = job.getTargetSegments().stream().map(segId -> {
                    NDataSegment curSeg = df.getSegment(segId);
                    return ((NDataSegment)Objects.requireNonNull(df.getSegments().stream().filter(seg -> seg.getSegRange().equals(curSeg.getSegRange()) && !seg.getId().equals(segId)).findFirst().orElse(null))).getId();
                }).collect(Collectors.toList());
                job.setParam("oldSegmentIds", String.join((CharSequence)",", oldSegs));
                secondStorage = JobStepType.SECOND_STORAGE_REFRESH.createStep((DefaultExecutable)job, (KylinConfig)config);
            }
        }
        return secondStorage;
    }

    private static AbstractExecutable initCleanUpTransactionalTable(KylinConfig kylinConfig, NDataflow df, NSparkCubingJob job, KylinConfigExt config) {
        AbstractExecutable cleanUpTransactionalTable = null;
        Boolean isRangePartitionTable = df.getModel().getAllTableRefs().stream().anyMatch(tableRef -> tableRef.getTableDesc().isRangePartition());
        Boolean isTransactionalTable = df.getModel().getAllTableRefs().stream().anyMatch(tableRef -> tableRef.getTableDesc().isTransactional());
        if (HiveTableRefChecker.isNeedCleanUpTransactionalTableJob(isTransactionalTable, isRangePartitionTable, kylinConfig.isReadTransactionalTableEnabled())) {
            cleanUpTransactionalTable = JobStepType.CLEAN_UP_TRANSACTIONAL_TABLE.createStep((DefaultExecutable)job, (KylinConfig)config);
        }
        return cleanUpTransactionalTable;
    }

    public static void setDAGRelations(AbstractExecutable job, KylinConfig config, NSparkCubingJobStep jobStep) {
        if (!StringUtils.equalsIgnoreCase((CharSequence)config.getJobSchedulerMode(), (CharSequence)JobSchedulerModeEnum.CHAIN.toString())) {
            AbstractExecutable resourceDetect = jobStep.getResourceDetect();
            AbstractExecutable cubing = jobStep.getCubing();
            AbstractExecutable updateMetadata = jobStep.getUpdateMetadata();
            AbstractExecutable secondStorageDeleteIndex = jobStep.getSecondStorageDeleteIndex();
            AbstractExecutable secondStorage = jobStep.getSecondStorage();
            AbstractExecutable cleanUpTransactionalTable = jobStep.getCleanUpTransactionalTable();
            NSparkCubingJob.initResourceDetectDagNode((AbstractExecutable)resourceDetect, (AbstractExecutable)cubing, (AbstractExecutable)secondStorage);
            cubing.setNextSteps((Set)Sets.newHashSet((Object[])new String[]{updateMetadata.getId()}));
            updateMetadata.setPreviousStep(cubing.getId());
            AbstractExecutable preStep = updateMetadata;
            if (secondStorageDeleteIndex != null) {
                NSparkCubingJob.setNextStep(preStep, secondStorageDeleteIndex);
                preStep = secondStorageDeleteIndex;
            }
            if (cleanUpTransactionalTable != null) {
                preStep.setNextSteps((Set)Sets.newHashSet((Object[])new String[]{cleanUpTransactionalTable.getId()}));
                cleanUpTransactionalTable.setParentId(preStep.getId());
            }
            job.setJobSchedulerMode(JobSchedulerModeEnum.DAG);
        }
    }

    private static void setNextStep(AbstractExecutable preStep, AbstractExecutable currentStep) {
        preStep.setNextSteps((Set)Sets.newHashSet((Object[])new String[]{currentStep.getId()}));
        currentStep.setPreviousStep(preStep.getId());
    }

    public static void checkIfNeedBuildSnapshots(NSparkCubingJob job) {
        switch (job.getJobType()) {
            case INC_BUILD: 
            case INDEX_REFRESH: 
            case INDEX_BUILD: {
                job.setParam("needBuildSnapshots", "true");
                break;
            }
            default: {
                job.setParam("needBuildSnapshots", "false");
            }
        }
    }

    public Set<String> getMetadataDumpList(KylinConfig config) {
        String dataflowId = this.getParam("dataflowId");
        return NDataflowManager.getInstance((KylinConfig)config, (String)this.getProject()).getDataflow(dataflowId).collectPrecalculationResource();
    }

    public NSparkCubingStep getSparkCubingStep() {
        return (NSparkCubingStep)this.getTask(NSparkCubingStep.class);
    }

    public NResourceDetectStep getResourceDetectStep() {
        return (NResourceDetectStep)this.getTask(NResourceDetectStep.class);
    }

    public SparkCleanupTransactionalTableStep getCleanIntermediateTableStep() {
        return (SparkCleanupTransactionalTableStep)this.getTask(SparkCleanupTransactionalTableStep.class);
    }

    public void cancelJob() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject());
        NDataflow dataflow = nDataflowManager.getDataflow(this.getSparkCubingStep().getDataflowId());
        if (dataflow == null) {
            logger.debug("Dataflow is null, maybe model is deleted?");
            return;
        }
        ArrayList<NDataSegment> toRemovedSegments = new ArrayList<NDataSegment>();
        for (String id : this.getSparkCubingStep().getSegmentIds()) {
            NDataSegment segment = dataflow.getSegment(id);
            if (segment == null || SegmentStatusEnum.READY == segment.getStatus() || SegmentStatusEnum.WARNING == segment.getStatus()) continue;
            toRemovedSegments.add(segment);
        }
        NDataSegment[] nDataSegments = toRemovedSegments.toArray(new NDataSegment[0]);
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs(nDataSegments);
        nDataflowManager.updateDataflow(nDataflowUpdate);
        this.updatePartitionOnCancelJob();
    }

    public void updatePartitionOnCancelJob() {
        if (!this.isBucketJob()) {
            return;
        }
        NDataflowManager dfManager = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject());
        NDataflow df = dfManager.getDataflow(this.getSparkCubingStep().getDataflowId()).copy();
        Set<String> segmentIds = this.getSparkCubingStep().getSegmentIds();
        Set partitions = this.getSparkCubingStep().getTargetPartitions();
        switch (this.getJobType()) {
            case SUB_PARTITION_BUILD: {
                for (String id : segmentIds) {
                    NDataSegment segment = df.getSegment(id);
                    if (segment == null) continue;
                    dfManager.removeLayoutPartition(df.getId(), partitions, (Set)Sets.newHashSet((Object[])new String[]{segment.getId()}));
                    dfManager.removeSegmentPartition(df.getId(), partitions, (Set)Sets.newHashSet((Object[])new String[]{segment.getId()}));
                    logger.info(String.format(Locale.ROOT, "Remove partitions [%s] in segment [%s] cause to cancel job.", partitions, id));
                }
                break;
            }
            case SUB_PARTITION_REFRESH: {
                for (String id : segmentIds) {
                    NDataSegment segment = df.getSegment(id);
                    if (segment == null) continue;
                    segment.getMultiPartitions().forEach(partition -> {
                        if (partitions.contains(partition.getPartitionId()) && PartitionStatusEnum.REFRESH == partition.getStatus()) {
                            partition.setStatus(PartitionStatusEnum.READY);
                        }
                    });
                    NDataflowUpdate dfUpdate = new NDataflowUpdate(df.getId());
                    dfUpdate.setToUpdateSegs(new NDataSegment[]{segment});
                    dfManager.updateDataflow(dfUpdate);
                    logger.info(String.format(Locale.ROOT, "Change partitions [%s] in segment [%s] status to READY cause to cancel job.", partitions, id));
                }
                break;
            }
        }
    }

    public boolean safetyIfDiscard() {
        if (this.checkSuicide() || this.getStatus().isFinalState() || this.getJobType() != JobTypeEnum.INC_BUILD) {
            return true;
        }
        NDataflow dataflow = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject()).getDataflow(this.getSparkCubingStep().getDataflowId());
        List segs = dataflow.getSegments().stream().filter(nDataSegment -> !this.getTargetSegments().contains(nDataSegment.getId())).collect(Collectors.toList());
        List toDeletedSeg = dataflow.getSegments().stream().filter(nDataSegment -> this.getTargetSegments().contains(nDataSegment.getId())).collect(Collectors.toList());
        List segHoles = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject()).calculateHoles(this.getSparkCubingStep().getDataflowId(), segs);
        for (NDataSegment segHole : segHoles) {
            for (NDataSegment deleteSeg : toDeletedSeg) {
                if (!segHole.getSegRange().overlaps(deleteSeg.getSegRange()) && !segHole.getSegRange().contains(deleteSeg.getSegRange())) continue;
                return false;
            }
        }
        return true;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private static void enableCostBasedPlannerIfNeed(NDataflow df, Set<NDataSegment> segments, NSparkCubingJob job) {
        boolean needCostRecommendIndex;
        IndexPlan indexPlan = df.getIndexPlan();
        KylinConfig kylinConfig = indexPlan.getConfig();
        boolean bl = needCostRecommendIndex = indexPlan.getRuleBasedIndex() != null && indexPlan.getRuleBasedIndex().getLayoutsOfCostBasedList() == null && !indexPlan.getRuleBasedIndex().getAggregationGroups().isEmpty();
        if (!kylinConfig.enableCostBasedIndexPlanner() || !needCostRecommendIndex || !NSparkCubingJob.canEnablePlannerJob(job.getJobType())) return;
        if (segments.size() != 1) throw new KylinException((ErrorCodeSupplier)JobErrorCode.COST_BASED_PLANNER_ERROR, String.format(Locale.ROOT, "The number of segments to be built or refreshed must be 1, This is the first time to submit build job with enable cost based planner", new Object[0]));
        if (!NSparkCubingJob.noBuildingSegmentExist(df.getProject(), job.getTargetSubject(), kylinConfig)) throw new KylinException((ErrorCodeSupplier)JobErrorCode.COST_BASED_PLANNER_ERROR, String.format(Locale.ROOT, "There are running job for this model when submit the build job with cost based planner, please wait for other jobs to finish or cancel them", new Object[0]));
        if (indexPlan.getRuleBasedIndex().countOfIncludeDimension() > 63) {
            throw new KylinException((ErrorCodeSupplier)JobErrorCode.COST_BASED_PLANNER_ERROR, String.format(Locale.ROOT, "The count of row key %d can't be larger than 63, when use the cube planner", indexPlan.getRuleBasedIndex().countOfIncludeDimension()));
        }
        job.setParam("enablePlanner", Boolean.TRUE.toString());
    }

    private static boolean noBuildingSegmentExist(String project, String modelId, KylinConfig kylinConfig) {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance((KylinConfig)kylinConfig, (String)project);
        NDataflow dataflow = nDataflowManager.getDataflow(modelId);
        return dataflow.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).size() <= 1;
    }

    private static boolean canEnablePlannerJob(JobTypeEnum jobType) {
        return JobTypeEnum.INC_BUILD.equals((Object)jobType) || JobTypeEnum.INDEX_REFRESH.equals((Object)jobType);
    }

    static {
        JobFactory.register((String)"CUBE_JOB_FACTORY", (JobFactory)new CubingJobFactory());
    }

    static class NSparkCubingJobStep {
        private final AbstractExecutable resourceDetect;
        private final AbstractExecutable cubing;
        private final AbstractExecutable updateMetadata;
        private final AbstractExecutable secondStorageDeleteIndex;
        private final AbstractExecutable secondStorage;
        private final AbstractExecutable cleanUpTransactionalTable;

        @Generated
        public NSparkCubingJobStep(AbstractExecutable resourceDetect, AbstractExecutable cubing, AbstractExecutable updateMetadata, AbstractExecutable secondStorageDeleteIndex, AbstractExecutable secondStorage, AbstractExecutable cleanUpTransactionalTable) {
            this.resourceDetect = resourceDetect;
            this.cubing = cubing;
            this.updateMetadata = updateMetadata;
            this.secondStorageDeleteIndex = secondStorageDeleteIndex;
            this.secondStorage = secondStorage;
            this.cleanUpTransactionalTable = cleanUpTransactionalTable;
        }

        @Generated
        public AbstractExecutable getResourceDetect() {
            return this.resourceDetect;
        }

        @Generated
        public AbstractExecutable getCubing() {
            return this.cubing;
        }

        @Generated
        public AbstractExecutable getUpdateMetadata() {
            return this.updateMetadata;
        }

        @Generated
        public AbstractExecutable getSecondStorageDeleteIndex() {
            return this.secondStorageDeleteIndex;
        }

        @Generated
        public AbstractExecutable getSecondStorage() {
            return this.secondStorage;
        }

        @Generated
        public AbstractExecutable getCleanUpTransactionalTable() {
            return this.cleanUpTransactionalTable;
        }
    }

    static class CubingJobFactory
    extends JobFactory {
        private CubingJobFactory() {
        }

        protected NSparkCubingJob create(JobFactory.JobBuildParams jobBuildParams) {
            return NSparkCubingJob.create(jobBuildParams);
        }
    }
}

