/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.merger;

import com.clearspring.analytics.util.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import lombok.Generated;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.engine.spark.ExecutableUtils;
import org.apache.kylin.engine.spark.merger.SparkJobMetadataMerger;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AfterMergeOrRefreshResourceMerger
extends SparkJobMetadataMerger {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AfterMergeOrRefreshResourceMerger.class);

    public AfterMergeOrRefreshResourceMerger(KylinConfig config, String project) {
        super(config, project);
    }

    public NDataLayout[] mergeMultiPartitionModel(String dataflowId, Set<String> segmentIds, Set<Long> layoutIds, ResourceStore remoteResourceStore, JobTypeEnum jobType, Set<Long> partitions) {
        NDataSegment mergedSegment;
        NDataflowManager mgr = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject());
        NDataflowUpdate update = new NDataflowUpdate(dataflowId);
        NDataflowManager localDataflowManager = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject());
        NDataflow localDataflow = localDataflowManager.getDataflow(dataflowId).copy();
        NDataflowManager distMgr = NDataflowManager.getInstance((KylinConfig)remoteResourceStore.getConfig(), (String)this.getProject());
        NDataflow distDataflow = distMgr.getDataflow(update.getDataflowId()).copy();
        List toUpdateSegments = Lists.newArrayList();
        List toUpdateCuboids = Lists.newArrayList();
        NDataSegment remoteSegment = distDataflow.getSegment(segmentIds.iterator().next());
        NDataSegment localSegment = localDataflow.getSegment(segmentIds.iterator().next());
        Set<Long> availableLayoutIds = this.getAvailableLayoutIds(localDataflow, layoutIds);
        List toRemoveSegments = Lists.newArrayList();
        if (JobTypeEnum.SUB_PARTITION_REFRESH != jobType) {
            toRemoveSegments = distMgr.getToRemoveSegs(distDataflow, remoteSegment);
        }
        if (JobTypeEnum.INDEX_MERGE == jobType) {
            mergedSegment = remoteSegment;
            long lastBuildTime = System.currentTimeMillis();
            mergedSegment.getMultiPartitions().forEach(partition -> {
                partition.setStatus(PartitionStatusEnum.READY);
                partition.setLastBuildTime(lastBuildTime);
            });
            mergedSegment.setLastBuildTime(lastBuildTime);
            toUpdateCuboids.addAll(new ArrayList(mergedSegment.getSegDetails().getLayouts()));
        } else {
            mergedSegment = this.upsertSegmentPartition(localSegment, remoteSegment, partitions);
            for (String segId : segmentIds) {
                NDataSegment remoteSeg = distDataflow.getSegment(segId);
                NDataSegment localSeg = localDataflow.getSegment(segId);
                for (long layoutId : availableLayoutIds) {
                    NDataLayout remoteLayout = remoteSeg.getLayout(layoutId);
                    NDataLayout localLayout = localSeg.getLayout(layoutId);
                    NDataLayout upsertLayout = this.upsertLayoutPartition(localLayout, remoteLayout, partitions);
                    toUpdateCuboids.add(upsertLayout);
                }
            }
        }
        if (mergedSegment.getStatus() == SegmentStatusEnum.NEW) {
            mergedSegment.setStatus(SegmentStatusEnum.READY);
        }
        toUpdateSegments.add(mergedSegment);
        if (JobTypeEnum.INDEX_MERGE == jobType) {
            Optional<Long> reduce = toRemoveSegments.stream().map(NDataSegment::getSourceBytesSize).filter(size -> size != -1L).reduce(Long::sum);
            if (reduce.isPresent()) {
                long totalSourceSize = reduce.get();
                mergedSegment.setSourceBytesSize(totalSourceSize);
                mergedSegment.setLastBuildTime(System.currentTimeMillis());
            }
            if (toRemoveSegments.stream().anyMatch(seg -> seg.getStatus() == SegmentStatusEnum.WARNING)) {
                mergedSegment.setStatus(SegmentStatusEnum.WARNING);
            }
        }
        update.setToAddOrUpdateLayouts(toUpdateCuboids.toArray(new NDataLayout[0]));
        update.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[0]));
        update.setToUpdateSegs(toUpdateSegments.toArray(new NDataSegment[0]));
        mgr.updateDataflow(update);
        this.updateIndexPlan(dataflowId, remoteResourceStore);
        return update.getToAddOrUpdateLayouts();
    }

    public NDataLayout[] mergeNormalModel(String dataflowId, Set<String> segmentIds, Set<Long> layoutIds, ResourceStore remoteResourceStore, JobTypeEnum jobType, Set<Long> partitions) {
        NDataflowManager mgr = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject());
        NDataflowUpdate update = new NDataflowUpdate(dataflowId);
        NDataflowManager distMgr = NDataflowManager.getInstance((KylinConfig)remoteResourceStore.getConfig(), (String)this.getProject());
        NDataflow distDataflow = distMgr.getDataflow(update.getDataflowId()).copy();
        List toUpdateSegments = Lists.newArrayList();
        List toUpdateCuboids = Lists.newArrayList();
        NDataSegment mergedSegment = distDataflow.getSegment(segmentIds.iterator().next());
        if (mergedSegment.getStatus() == SegmentStatusEnum.NEW) {
            mergedSegment.setStatus(SegmentStatusEnum.READY);
        }
        mergedSegment.setLastBuildTime(mergedSegment.getSegDetails().getLastModified());
        toUpdateSegments.add(mergedSegment);
        List toRemoveSegments = distMgr.getToRemoveSegs(distDataflow, mergedSegment);
        if (JobTypeEnum.INDEX_MERGE == jobType) {
            Optional<Long> reduce = toRemoveSegments.stream().map(NDataSegment::getSourceBytesSize).filter(size -> size != -1L).reduce(Long::sum);
            if (reduce.isPresent()) {
                long totalSourceSize = reduce.get();
                mergedSegment.setSourceBytesSize(totalSourceSize);
                mergedSegment.setLastBuildTime(System.currentTimeMillis());
            }
            if (toRemoveSegments.stream().anyMatch(seg -> seg.getStatus() == SegmentStatusEnum.WARNING)) {
                mergedSegment.setStatus(SegmentStatusEnum.WARNING);
            }
        }
        toUpdateCuboids.addAll(new ArrayList(mergedSegment.getSegDetails().getLayouts()));
        update.setToAddOrUpdateLayouts(toUpdateCuboids.toArray(new NDataLayout[0]));
        update.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[0]));
        update.setToUpdateSegs(toUpdateSegments.toArray(new NDataSegment[0]));
        mgr.updateDataflow(update);
        this.updateIndexPlan(dataflowId, remoteResourceStore);
        return update.getToAddOrUpdateLayouts();
    }

    @Override
    public NDataLayout[] merge(String dataflowId, Set<String> segmentIds, Set<Long> layoutIds, ResourceStore remoteResourceStore, JobTypeEnum jobType, Set<Long> partitions) {
        if (CollectionUtils.isNotEmpty(partitions)) {
            return this.mergeMultiPartitionModel(dataflowId, segmentIds, layoutIds, remoteResourceStore, jobType, partitions);
        }
        return this.mergeNormalModel(dataflowId, segmentIds, layoutIds, remoteResourceStore, jobType, partitions);
    }

    @Override
    public void merge(AbstractExecutable abstractExecutable) {
        try (ResourceStore buildResourceStore = ExecutableUtils.getRemoteStore(this.getConfig(), abstractExecutable);){
            String dataFlowId = ExecutableUtils.getDataflowId(abstractExecutable);
            Set<String> segmentIds = ExecutableUtils.getSegmentIds(abstractExecutable);
            Set<Long> layoutIds = ExecutableUtils.getLayoutIds(abstractExecutable);
            Set<Long> partitionIds = ExecutableUtils.getPartitionIds(abstractExecutable);
            NDataLayout[] nDataLayouts = this.merge(dataFlowId, segmentIds, layoutIds, buildResourceStore, abstractExecutable.getJobType(), partitionIds);
            NDataflow dataflow = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject()).getDataflow(dataFlowId);
            if (ExecutableUtils.needBuildSnapshots(abstractExecutable)) {
                this.mergeSnapshotMeta(dataflow, buildResourceStore);
            }
            this.mergeTableExtMeta(dataflow, buildResourceStore);
            this.recordDownJobStats(abstractExecutable, nDataLayouts);
            abstractExecutable.notifyUserIfNecessary(nDataLayouts);
        }
    }
}

