/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.job;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.DFLayoutMergeAssist;
import org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate;
import org.apache.kylin.engine.spark.job.CuboidAggregator;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.datasource.storage.StorageStore;
import org.apache.spark.sql.datasource.storage.StorageStoreFactory;
import org.apache.spark.sql.datasource.storage.WriteTaskStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public class DFMergeJob
extends SparkApplication {
    protected static final Logger logger = LoggerFactory.getLogger(DFMergeJob.class);
    protected BuildLayoutWithUpdate buildLayoutWithUpdate;

    public static Map<Long, DFLayoutMergeAssist> generateMergeAssist(List<NDataSegment> mergingSegments, SparkSession ss, NDataSegment mergedSeg) {
        ConcurrentMap mergeCuboidsAssist = Maps.newConcurrentMap();
        for (NDataSegment seg : mergingSegments) {
            for (NDataLayout cuboid : seg.getSegDetails().getLayouts()) {
                long layoutId = cuboid.getLayoutId();
                DFLayoutMergeAssist assist = (DFLayoutMergeAssist)mergeCuboidsAssist.get(layoutId);
                if (assist == null) {
                    assist = new DFLayoutMergeAssist();
                    assist.addCuboid(cuboid);
                    assist.setSs(ss);
                    assist.setNewSegment(mergedSeg);
                    assist.setLayout(cuboid.getLayout());
                    assist.setToMergeSegments(mergingSegments);
                    mergeCuboidsAssist.put(layoutId, assist);
                    continue;
                }
                assist.addCuboid(cuboid);
            }
        }
        return mergeCuboidsAssist;
    }

    public static void main(String[] args) {
        DFMergeJob nDataflowBuildJob = new DFMergeJob();
        nDataflowBuildJob.execute(args);
    }

    @Override
    protected void doExecute() throws Exception {
        this.buildLayoutWithUpdate = new BuildLayoutWithUpdate();
        String dataflowId = this.getParam("dataflowId");
        String newSegmentId = this.getParam("segmentIds");
        Set layoutIds = NSparkCubingUtil.str2Longs((String)this.getParam("layoutIds"));
        this.mergeColumnSize(dataflowId, newSegmentId);
        try {
            this.mergeFlatTable(dataflowId, newSegmentId);
        }
        catch (Exception e) {
            logger.warn("Merge flat table failed.", (Throwable)e);
        }
        this.mergeSegments(dataflowId, newSegmentId, layoutIds);
    }

    private void mergeColumnSize(String dataflowId, String segmentId) {
        NDataflowManager mgr = NDataflowManager.getInstance((KylinConfig)this.config, (String)this.project);
        NDataflow dataflow = mgr.getDataflow(dataflowId);
        NDataSegment mergedSeg = dataflow.getSegment(segmentId);
        Segments mergingSegments = dataflow.getMergingSegments(mergedSeg);
        Collections.sort(mergingSegments);
        this.infos.clearMergingSegments();
        this.infos.recordMergingSegments((List<NDataSegment>)mergingSegments);
        NDataflow flowCopy = dataflow.copy();
        NDataSegment segCopy = flowCopy.getSegment(segmentId);
        this.mergeColumnSizeForNewSegment(segCopy, (List<NDataSegment>)mergingSegments);
        NDataflowUpdate update = new NDataflowUpdate(dataflowId);
        update.setToUpdateSegs(new NDataSegment[]{segCopy});
        mgr.updateDataflow(update);
    }

    private void mergeColumnSizeForNewSegment(NDataSegment segCopy, List<NDataSegment> mergingSegments) {
        SourceUsageManager usageManager = SourceUsageManager.getInstance((KylinConfig)this.config);
        HashMap result = Maps.newHashMap();
        for (NDataSegment seg : mergingSegments) {
            Map newByteSizeMap = MapUtils.isEmpty((Map)seg.getColumnSourceBytes()) ? usageManager.calcAvgColumnSourceBytes(seg) : seg.getColumnSourceBytes();
            this.mergeByteSizeMap(result, newByteSizeMap);
        }
        segCopy.setColumnSourceBytes((Map)result);
    }

    private void mergeByteSizeMap(Map<String, Long> result, Map<String, Long> newByteSizeMap) {
        for (Map.Entry<String, Long> entry : newByteSizeMap.entrySet()) {
            Long oriSize = result.getOrDefault(entry.getKey(), 0L);
            result.put(entry.getKey(), oriSize + entry.getValue());
        }
    }

    protected List<NDataSegment> getMergingSegments(NDataflow dataflow, NDataSegment mergedSeg) {
        return dataflow.getMergingSegments(mergedSeg);
    }

    protected void mergeSegments(String dataflowId, String segmentId, Set<Long> specifiedCuboids) throws IOException {
        NDataflowManager mgr = NDataflowManager.getInstance((KylinConfig)this.config, (String)this.project);
        NDataflow dataflow = mgr.getDataflow(dataflowId);
        final NDataSegment mergedSeg = dataflow.getSegment(segmentId);
        List<NDataSegment> mergingSegments = this.getMergingSegments(dataflow, mergedSeg);
        Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist = DFMergeJob.generateMergeAssist(mergingSegments, this.ss, mergedSeg);
        for (final DFLayoutMergeAssist assist : mergeCuboidsAssist.values()) {
            Dataset afterSort;
            Dataset<Row> afterMerge = assist.merge();
            final LayoutEntity layout = assist.getLayout();
            if (IndexEntity.isTableIndex((long)layout.getIndex().getId())) {
                afterSort = afterMerge.sortWithinPartitions(NSparkCubingUtil.getColumns((Set[])new Set[]{layout.getOrderedDimensions().keySet()}));
            } else {
                Column[] dimsCols = NSparkCubingUtil.getColumns((Set[])new Set[]{layout.getOrderedDimensions().keySet()});
                Dataset<Row> afterAgg = CuboidAggregator.agg(afterMerge, (Set<Integer>)layout.getOrderedDimensions().keySet(), (Map<Integer, NDataModel.Measure>)layout.getOrderedMeasures(), mergedSeg, null);
                afterSort = afterAgg.sortWithinPartitions(dimsCols);
            }
            this.buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity(){

                @Override
                public long getIndexId() {
                    return layout.getIndexId();
                }

                @Override
                public String getName() {
                    return "merge-layout-" + layout.getId();
                }

                @Override
                public List<NDataLayout> build() throws IOException {
                    return Lists.newArrayList((Object[])new NDataLayout[]{DFMergeJob.this.saveAndUpdateCuboid((Dataset<Row>)afterSort, mergedSeg, layout, assist)});
                }
            }, this.config);
        }
        this.buildLayoutWithUpdate.updateLayout(mergedSeg, this.config, this.project);
    }

    protected NDataLayout saveAndUpdateCuboid(Dataset<Row> dataset, NDataSegment seg, LayoutEntity layout, DFLayoutMergeAssist assist) throws IOException {
        this.ss.sparkContext().setLocalProperty("spark.scheduler.pool", "merge");
        long layoutId = layout.getId();
        long sourceCount = 0L;
        for (NDataLayout cuboid : assist.getCuboids()) {
            sourceCount += cuboid.getSourceRows();
        }
        NDataLayout dataLayout = NDataLayout.newDataLayout((NDataflow)seg.getDataflow(), (String)seg.getId(), (long)layoutId);
        String path = NSparkCubingUtil.getStoragePath((NDataSegment)seg, (Long)layoutId);
        int storageType = layout.getModel().getStorageType();
        StorageStore storage = StorageStoreFactory.create((int)storageType);
        this.ss.sparkContext().setJobDescription("Merge layout " + layoutId);
        WriteTaskStats taskStats = storage.save(layout, new Path(path), KapConfig.wrap((KylinConfig)this.config), dataset);
        this.ss.sparkContext().setJobDescription(null);
        dataLayout.setBuildJobId(this.jobId);
        long rowCount = taskStats.numRows();
        if (rowCount == -1L) {
            KylinBuildEnv.get().buildJobInfos().recordAbnormalLayouts(layout.getId(), "Job metrics seems null, use count() to collect cuboid rows.");
            logger.info("Can not get cuboid row cnt.");
        }
        dataLayout.setRows(rowCount);
        dataLayout.setSourceRows(sourceCount);
        dataLayout.setPartitionNum(taskStats.numBucket());
        dataLayout.setPartitionValues(taskStats.partitionValues());
        dataLayout.setFileCount(taskStats.numFiles());
        dataLayout.setByteSize(taskStats.numBytes());
        return dataLayout;
    }

    private List<String> predicatedSegments(Predicate<NDataSegment> predicate, List<NDataSegment> sources) {
        return sources.stream().filter(predicate).map(NDataSegment::getId).collect(Collectors.toList());
    }

    private List<Path> getSegmentFlatTables(String dataFlowId, List<NDataSegment> segments) {
        List<String> notReadies = this.predicatedSegments(segment -> !segment.isFlatTableReady(), segments);
        if (CollectionUtils.isNotEmpty(notReadies)) {
            String logStr = String.join((CharSequence)",", notReadies);
            logger.warn("[UNEXPECTED_THINGS_HAPPENED] Plan to merge segments' flat table, but found that some's flat table were not ready like [{}]", (Object)logStr);
            return Lists.newArrayList();
        }
        FileSystem fs = HadoopUtil.getWorkingFileSystem();
        List<String> notExists = this.predicatedSegments(segment -> {
            try {
                Path p = this.config.getFlatTableDir(this.project, dataFlowId, segment.getId());
                return !fs.exists(p);
            }
            catch (IOException ioe) {
                logger.warn("[UNEXPECTED_THINGS_HAPPENED] When checking segment's flat table exists, segment id: {}", (Object)segment.getId(), (Object)ioe);
                return true;
            }
        }, segments);
        if (CollectionUtils.isNotEmpty(notExists)) {
            String logStr = String.join((CharSequence)",", notExists);
            logger.warn("[UNEXPECTED_THINGS_HAPPENED] Plan to merge segments' flat table, but found that some's flat table were not exists like [{}]", (Object)logStr);
            return Lists.newArrayList();
        }
        return segments.stream().map(segment -> this.config.getFlatTableDir(this.project, dataFlowId, segment.getId())).collect(Collectors.toList());
    }

    private void mergeFlatTable(String dataFlowId, String segmentId) {
        NDataSegment mergedSeg;
        if (!this.config.isPersistFlatTableEnabled()) {
            logger.info("project {} flat table persisting is not enabled.", (Object)this.project);
            return;
        }
        NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)this.config, (String)this.project);
        NDataflow dataFlow = dfMgr.getDataflow(dataFlowId);
        List<NDataSegment> mergingSegments = this.getMergingSegments(dataFlow, mergedSeg = dataFlow.getSegment(segmentId));
        if (mergingSegments.size() < 1) {
            return;
        }
        Collections.sort(mergingSegments);
        List<Path> flatTables = this.getSegmentFlatTables(dataFlowId, mergingSegments);
        if (CollectionUtils.isEmpty(flatTables)) {
            return;
        }
        Dataset flatTableDs = null;
        Object[] names = this.ss.read().parquet(flatTables.get(0).toString()).schema().fieldNames();
        Arrays.sort(names);
        for (Path p : flatTables) {
            Dataset newDs = this.ss.read().parquet(p.toString());
            Object[] fieldNames = newDs.schema().fieldNames();
            Arrays.sort(fieldNames);
            if (Arrays.equals(names, fieldNames)) {
                flatTableDs = Objects.isNull(flatTableDs) ? newDs : flatTableDs.union(newDs);
                continue;
            }
            logger.info("Schema: {} in path: {} is conflict with others: {}. Skip merge flat table.", new Object[]{fieldNames, p, names});
            return;
        }
        if (Objects.isNull(flatTableDs)) {
            return;
        }
        Path newPath = this.config.getFlatTableDir(this.project, dataFlowId, segmentId);
        this.ss.sparkContext().setLocalProperty("spark.scheduler.pool", "merge");
        this.ss.sparkContext().setJobDescription("Persist flat table.");
        flatTableDs.write().mode(SaveMode.Overwrite).parquet(newPath.toString());
        logger.info("Persist merged flat tables to path {} with schema [{}], new segment id: {}, dataFlowId: {}", new Object[]{newPath, names, segmentId, dataFlowId});
        NDataflow dfCopied = dataFlow.copy();
        NDataSegment segmentCopied = dfCopied.getSegment(segmentId);
        segmentCopied.setFlatTableReady(true);
        NDataflowUpdate update = new NDataflowUpdate(dataFlowId);
        update.setToUpdateSegs(new NDataSegment[]{segmentCopied});
        dfMgr.updateDataflow(update);
    }

    @Override
    protected String generateInfo() {
        return LogJobInfoUtils.dfMergeJobInfo();
    }
}

