/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.job;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.kyligence.kap.secondstorage.SecondStorageUtil;
import io.kyligence.kap.secondstorage.enums.LockTypeEnum;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.engine.spark.job.JobStepType;
import org.apache.kylin.engine.spark.job.NResourceDetectStep;
import org.apache.kylin.engine.spark.job.NSparkCleanupAfterMergeStep;
import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
import org.apache.kylin.engine.spark.job.NSparkMergingStep;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultExecutable;
import org.apache.kylin.job.execution.DefaultExecutableOnModel;
import org.apache.kylin.job.execution.ExecutableParams;
import org.apache.kylin.job.execution.JobSchedulerModeEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.factory.JobFactory;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.job.JobBucket;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NSparkMergingJob
extends DefaultExecutableOnModel {
    private static final Logger logger = LoggerFactory.getLogger(NSparkMergingJob.class);

    public NSparkMergingJob() {
    }

    public NSparkMergingJob(Object notSetId) {
        super(notSetId);
    }

    public static NSparkMergingJob merge(NDataSegment mergedSegment, Set<LayoutEntity> layouts, String submitter, String jobId) {
        return NSparkMergingJob.merge(mergedSegment, layouts, submitter, jobId, null, null);
    }

    public static NSparkMergingJob merge(NDataSegment mergedSegment, Set<LayoutEntity> layouts, String submitter, String jobId, Set<Long> partitions, Set<JobBucket> buckets) {
        Preconditions.checkArgument((mergedSegment != null ? 1 : 0) != 0);
        Preconditions.checkArgument((submitter != null ? 1 : 0) != 0);
        NDataflow df = mergedSegment.getDataflow();
        if (layouts == null) {
            layouts = Sets.newHashSet((Iterable)df.getIndexPlan().getAllLayouts());
        }
        NSparkMergingJob job = new NSparkMergingJob();
        job.setName(JobTypeEnum.INDEX_MERGE.toString());
        job.setJobType(JobTypeEnum.INDEX_MERGE);
        job.setId(jobId);
        job.setTargetSubject(mergedSegment.getModel().getUuid());
        job.setTargetSegments(Lists.newArrayList((Object[])new String[]{String.valueOf(mergedSegment.getId())}));
        job.setProject(mergedSegment.getProject());
        job.setSubmitter(submitter);
        if (CollectionUtils.isNotEmpty(partitions)) {
            job.setTargetPartitions(partitions);
            job.setParam("partitionIds", job.getTargetPartitions().stream().map(String::valueOf).collect(Collectors.joining(",")));
        }
        if (CollectionUtils.isNotEmpty(buckets)) {
            job.setParam("buckets", ExecutableParams.toBucketParam(buckets));
        }
        job.setParam("jobId", jobId);
        job.setParam("project", df.getProject());
        job.setParam("targetModel", job.getTargetSubject());
        job.setParam("dataflowId", df.getId());
        job.setParam("layoutIds", NSparkCubingUtil.ids2Str((Set)NSparkCubingUtil.toLayoutIds((Set)layouts)));
        job.setParam("segmentIds", String.join((CharSequence)",", job.getTargetSegments()));
        job.setParam("dataRangeStart", mergedSegment.getSegRange().getStart().toString());
        job.setParam("dataRangeEnd", mergedSegment.getSegRange().getEnd().toString());
        KylinConfigExt config = df.getConfig();
        AbstractExecutable resourceDetect = JobStepType.RESOURCE_DETECT.createStep((DefaultExecutable)job, (KylinConfig)config);
        AbstractExecutable merging = JobStepType.MERGING.createStep((DefaultExecutable)job, (KylinConfig)config);
        AbstractExecutable cleanStep = JobStepType.CLEAN_UP_AFTER_MERGE.createStep((DefaultExecutable)job, (KylinConfig)config);
        Segments mergingSegments = df.getMergingSegments(mergedSegment);
        cleanStep.setParam("segmentIds", String.join((CharSequence)",", NSparkCubingUtil.toSegmentIds((Segments)mergingSegments)));
        AbstractExecutable mergeStep = NSparkMergingJob.initSecondMergeStep(mergedSegment, layouts, df, job, (KylinConfig)config, (Segments<NDataSegment>)mergingSegments);
        AbstractExecutable updateMetadata = JobStepType.UPDATE_METADATA.createStep((DefaultExecutable)job, (KylinConfig)config);
        if (SecondStorageUtil.isModelEnable((String)df.getProject(), (String)job.getTargetSubject())) {
            NSparkMergingJob.setDAGRelations((AbstractExecutable)job, (KylinConfig)config, resourceDetect, merging, cleanStep, mergeStep, updateMetadata);
        }
        return job;
    }

    private static AbstractExecutable initSecondMergeStep(NDataSegment mergedSegment, Set<LayoutEntity> layouts, NDataflow df, NSparkMergingJob job, KylinConfig config, Segments<NDataSegment> mergingSegments) {
        AbstractExecutable mergeStep = null;
        if (SecondStorageUtil.isModelEnable((String)df.getProject(), (String)job.getTargetSubject()) && layouts.stream().anyMatch(SecondStorageUtil::isBaseTableIndex)) {
            SecondStorageUtil.validateProjectLock((String)df.getProject(), Collections.singletonList(LockTypeEnum.LOAD.name()));
            mergeStep = JobStepType.SECOND_STORAGE_MERGE.createStep((DefaultExecutable)job, config);
            mergeStep.setParam("mergedSegmentId", mergedSegment.getId());
            mergeStep.setParam("segmentIds", String.join((CharSequence)",", NSparkCubingUtil.toSegmentIds(mergingSegments)));
        }
        return mergeStep;
    }

    public static void setDAGRelations(AbstractExecutable job, KylinConfig config, AbstractExecutable resourceDetect, AbstractExecutable merging, AbstractExecutable clean, AbstractExecutable secondStorageMerge, AbstractExecutable updateMetadata) {
        if (!StringUtils.equalsIgnoreCase((CharSequence)config.getJobSchedulerMode(), (CharSequence)JobSchedulerModeEnum.CHAIN.toString())) {
            NSparkMergingJob.initResourceDetectDagNode((AbstractExecutable)resourceDetect, (AbstractExecutable)merging, (AbstractExecutable)secondStorageMerge);
            merging.setNextSteps((Set)Sets.newHashSet((Object[])new String[]{clean.getId()}));
            clean.setPreviousStep(merging.getId());
            clean.setNextSteps((Set)Sets.newHashSet((Object[])new String[]{updateMetadata.getId()}));
            updateMetadata.setPreviousStep(clean.getId());
            job.setJobSchedulerMode(JobSchedulerModeEnum.DAG);
        }
    }

    public Set<String> getMetadataDumpList(KylinConfig config) {
        String dataflowId = this.getParam("dataflowId");
        return NDataflowManager.getInstance((KylinConfig)config, (String)this.getProject()).getDataflow(dataflowId).collectPrecalculationResource();
    }

    public NSparkMergingStep getSparkMergingStep() {
        return (NSparkMergingStep)this.getTask(NSparkMergingStep.class);
    }

    public NResourceDetectStep getResourceDetectStep() {
        return (NResourceDetectStep)this.getTask(NResourceDetectStep.class);
    }

    public NSparkCleanupAfterMergeStep getCleanUpAfterMergeStep() {
        return (NSparkCleanupAfterMergeStep)this.getTask(NSparkCleanupAfterMergeStep.class);
    }

    public void cancelJob() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject());
        NDataflow dataflow = nDataflowManager.getDataflow(this.getSparkMergingStep().getDataflowId());
        if (dataflow == null) {
            logger.debug("Dataflow is null, maybe model is deleted?");
            return;
        }
        ArrayList<NDataSegment> toRemovedSegments = new ArrayList<NDataSegment>();
        for (String id : this.getSparkMergingStep().getSegmentIds()) {
            NDataSegment segment = dataflow.getSegment(id);
            if (segment == null || SegmentStatusEnum.READY == segment.getStatus() || SegmentStatusEnum.WARNING == segment.getStatus()) continue;
            toRemovedSegments.add(segment);
        }
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs(toRemovedSegments.toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
    }

    static {
        JobFactory.register((String)"MERGE_JOB_FACTORY", (JobFactory)new MergingJobFactory());
    }

    static class MergingJobFactory
    extends JobFactory {
        private MergingJobFactory() {
        }

        protected NSparkMergingJob create(JobFactory.JobBuildParams jobBuildParams) {
            if (jobBuildParams.getSegments() == null || jobBuildParams.getSegments().size() != 1) {
                return null;
            }
            return NSparkMergingJob.merge((NDataSegment)jobBuildParams.getSegments().iterator().next(), jobBuildParams.getLayouts(), jobBuildParams.getSubmitter(), jobBuildParams.getJobId(), jobBuildParams.getPartitions(), jobBuildParams.getBuckets());
        }
    }
}

