/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.job;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
import org.apache.kylin.engine.spark.job.SegmentJob;
import org.apache.kylin.engine.spark.job.StageType;
import org.apache.kylin.engine.spark.job.exec.MergeExec;
import org.apache.kylin.engine.spark.job.stage.BuildParam;
import org.apache.kylin.engine.spark.job.stage.StageExec;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.spark.tracker.BuildContext;

public class SegmentMergeJob
extends SegmentJob {
    @Override
    protected String generateInfo() {
        return LogJobInfoUtils.dfMergeJobInfo();
    }

    @Override
    protected final void doExecute() throws Exception {
        this.buildContext = new BuildContext(this.getSparkSession().sparkContext(), this.config);
        this.buildContext.appStatusTracker().startMonitorBuildResourceState();
        this.merge();
    }

    private void merge() throws IOException {
        Stream segmentStream = this.config.isSegmentParallelBuildEnabled() ? this.readOnlySegments.parallelStream() : this.readOnlySegments.stream();
        AtomicLong finishedSegmentCount = new AtomicLong(0L);
        int segmentsCount = this.readOnlySegments.size();
        segmentStream.forEach(seg -> {
            try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig.setAndUnsetThreadLocalConfig((KylinConfig)this.config);){
                String jobStepId = StringUtils.replace((String)this.infos.getJobStepId(), (String)"job_step_", (String)"");
                MergeExec exec = new MergeExec(jobStepId);
                BuildParam buildParam = new BuildParam();
                StageType.MERGE_FLAT_TABLE.createStage(this, (NDataSegment)seg, buildParam, exec);
                StageType.MERGE_INDICES.createStage(this, (NDataSegment)seg, buildParam, exec);
                exec.mergeSegment();
                StageExec mergeColumnBytes = StageType.MERGE_COLUMN_BYTES.createStage(this, (NDataSegment)seg, buildParam, exec);
                mergeColumnBytes.toWorkWithoutFinally();
                if (finishedSegmentCount.incrementAndGet() < (long)segmentsCount) {
                    mergeColumnBytes.onStageFinished(true);
                }
            }
            catch (IOException e) {
                Throwables.propagate((Throwable)e);
            }
        });
    }

    public static void main(String[] args) {
        SegmentMergeJob segmentMergeJob = new SegmentMergeJob();
        segmentMergeJob.execute(args);
    }
}

