/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.job;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
import org.apache.kylin.engine.spark.job.SegmentJob;
import org.apache.kylin.engine.spark.job.StageType;
import org.apache.kylin.engine.spark.job.exec.BuildExec;
import org.apache.kylin.engine.spark.job.stage.BuildParam;
import org.apache.kylin.engine.spark.job.stage.StageExec;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
import org.apache.spark.tracker.BuildContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentBuildJob
extends SegmentJob {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SegmentBuildJob.class);
    private boolean usePlanner = false;

    public static void main(String[] args) {
        SegmentBuildJob segmentBuildJob = new SegmentBuildJob();
        segmentBuildJob.execute(args);
    }

    @Override
    protected final void extraInit() {
        super.extraInit();
        String enablePlanner = this.getParam("enablePlanner");
        if (enablePlanner != null && Boolean.valueOf(enablePlanner).booleanValue()) {
            this.usePlanner = true;
        }
    }

    @Override
    protected String generateInfo() {
        return LogJobInfoUtils.dfBuildJobInfo();
    }

    @Override
    protected void waiteForResourceSuccess() throws Exception {
        if (this.config.isBuildCheckPartitionColEnabled()) {
            this.checkDateFormatIfExist(this.project, this.dataflowId);
        }
        StageExec waiteForResource = StageType.WAITE_FOR_RESOURCE.create(this, null, null);
        waiteForResource.onStageFinished(true);
        this.infos.recordStageId("");
    }

    @Override
    protected final void doExecute() throws Exception {
        log.info("Start sub stage {}" + StageType.REFRESH_SNAPSHOTS.name());
        StageType.REFRESH_SNAPSHOTS.create(this, null, null).toWork();
        log.info("End sub stage {}" + StageType.REFRESH_SNAPSHOTS.name());
        this.buildContext = new BuildContext(this.getSparkSession().sparkContext(), this.config);
        this.buildContext.appStatusTracker().startMonitorBuildResourceState();
        this.build();
        this.updateSegmentSourceBytesSize();
    }

    @Override
    protected final String calculateRequiredCores() throws Exception {
        if (this.config.getSparkEngineTaskImpactInstanceEnabled().booleanValue()) {
            String maxLeafTasksNums = this.maxLeafTasksNums();
            int factor = this.config.getSparkEngineTaskCoreFactor();
            int requiredCore = (int)Double.parseDouble(maxLeafTasksNums) / factor;
            log.info("The maximum number of tasks required to run the job is {}, require cores: {}", (Object)maxLeafTasksNums, (Object)requiredCore);
            return String.valueOf(requiredCore);
        }
        return "1";
    }

    private String maxLeafTasksNums() throws IOException {
        if (Objects.isNull(this.rdSharedPath)) {
            this.rdSharedPath = this.config.getJobTmpShareDir(this.project, this.jobId);
        }
        FileSystem fs = HadoopUtil.getWorkingFileSystem();
        FileStatus[] fileStatuses = fs.listStatus(this.rdSharedPath, path -> path.toString().endsWith(ResourceDetectUtils.cubingDetectItemFileSuffix()));
        return ResourceDetectUtils.selectMaxValueInFiles((FileStatus[])fileStatuses);
    }

    protected void build() throws IOException {
        Stream segmentStream = this.config.isSegmentParallelBuildEnabled() ? this.readOnlySegments.parallelStream() : this.readOnlySegments.stream();
        AtomicLong finishedSegmentCount = new AtomicLong(0L);
        int segmentsCount = this.readOnlySegments.size();
        segmentStream.forEach(seg -> {
            try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig.setAndUnsetThreadLocalConfig((KylinConfig)this.config);){
                this.infos.clearCuboidsNumPerLayer(seg.getId());
                String jobStepId = StringUtils.replace((String)this.infos.getJobStepId(), (String)"job_step_", (String)"");
                BuildExec exec = new BuildExec(jobStepId);
                BuildParam buildParam = new BuildParam();
                StageType.MATERIALIZED_FACT_TABLE.createStage(this, (NDataSegment)seg, buildParam, exec);
                StageType.BUILD_DICT.createStage(this, (NDataSegment)seg, buildParam, exec);
                StageType.GENERATE_FLAT_TABLE.createStage(this, (NDataSegment)seg, buildParam, exec);
                if (this.usePlanner) {
                    StageType.COST_BASED_PLANNER.createStage(this, (NDataSegment)seg, buildParam, exec);
                }
                StageType.GATHER_FLAT_TABLE_STATS.createStage(this, (NDataSegment)seg, buildParam, exec);
                StageType.BUILD_LAYER.createStage(this, (NDataSegment)seg, buildParam, exec);
                this.buildSegment((NDataSegment)seg, exec);
                StageExec refreshColumnBytes = StageType.REFRESH_COLUMN_BYTES.createStage(this, (NDataSegment)seg, buildParam, exec);
                refreshColumnBytes.toWorkWithoutFinally();
                if (finishedSegmentCount.incrementAndGet() < (long)segmentsCount) {
                    refreshColumnBytes.onStageFinished(true);
                }
            }
            catch (IOException e) {
                Throwables.propagate((Throwable)e);
            }
        });
    }

    private void buildSegment(NDataSegment dataSegment, BuildExec exec) throws IOException {
        log.info("Encoding segment {}", (Object)dataSegment.getId());
        exec.buildSegment();
    }

    public void tryRefreshSnapshots(StageExec stageExec) throws Exception {
        SnapshotBuilder snapshotBuilder = new SnapshotBuilder(this.getJobId());
        if (this.config.isSnapshotManualManagementEnabled()) {
            log.info("Skip snapshot build in snapshot manual mode, dataflow: {}, only calculate total rows", (Object)this.dataflowId);
            snapshotBuilder.calculateTotalRows(this.getSparkSession(), this.getDataflow(this.dataflowId).getModel(), this.getIgnoredSnapshotTables());
            stageExec.onStageSkipped();
            return;
        }
        if (!this.needBuildSnapshots()) {
            log.info("Skip snapshot build, dataflow {}, only calculate total rows", (Object)this.dataflowId);
            snapshotBuilder.calculateTotalRows(this.getSparkSession(), this.getDataflow(this.dataflowId).getModel(), this.getIgnoredSnapshotTables());
            stageExec.onStageSkipped();
            return;
        }
        log.info("Refresh SNAPSHOT.");
        snapshotBuilder.buildSnapshot(this.getSparkSession(), this.getDataflow(this.dataflowId).getModel(), this.getIgnoredSnapshotTables());
        if (this.config.isSnapshotSpecifiedSparkConf()) {
            log.info("exchange sparkSession using maintained sparkConf");
            this.exchangeSparkSession();
        }
        log.info("Finished SNAPSHOT.");
    }

    private void updateSegmentSourceBytesSize() {
        Map segmentSourceSize = ResourceDetectUtils.getSegmentSourceSize((Path)this.rdSharedPath);
        UnitOfWork.doInTransactionWithRetry(() -> {
            NDataflowManager dataflowManager = NDataflowManager.getInstance((KylinConfig)this.config, (String)this.project);
            NDataflow dataflow = dataflowManager.getDataflow(this.dataflowId);
            NDataflow newDF = dataflow.copy();
            NDataflowUpdate update = new NDataflowUpdate(dataflow.getUuid());
            ArrayList nDataSegments = Lists.newArrayList();
            for (Map.Entry entry : segmentSourceSize.entrySet()) {
                NDataSegment segment = newDF.getSegment((String)entry.getKey());
                segment.setSourceBytesSize(((Long)entry.getValue()).longValue());
                nDataSegments.add(segment);
            }
            update.setToUpdateSegs(nDataSegments.toArray(new NDataSegment[0]));
            dataflowManager.updateDataflow(update);
            NIndexPlanManager indexPlanManager = NIndexPlanManager.getInstance((KylinConfig)this.config, (String)this.project);
            indexPlanManager.updateIndexPlan(this.dataflowId, copyForWrite -> copyForWrite.setLayoutBucketNumMapping(indexPlanManager.getIndexPlan(this.dataflowId).getLayoutBucketNumMapping()));
            return null;
        }, (String)this.project);
    }
}

