/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.job;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.StringSplitter;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
import org.apache.kylin.engine.spark.builder.SnapshotPartitionBuilder;
import org.apache.kylin.engine.spark.job.StageType;
import org.apache.kylin.engine.spark.job.exec.SnapshotExec;
import org.apache.kylin.engine.spark.utils.FileNames;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotBuildJob
extends SparkApplication {
    protected static final Logger logger = LoggerFactory.getLogger(SnapshotBuildJob.class);

    private static Set<String> toPartitions(String tableListStr) {
        if (StringUtils.isBlank((String)tableListStr)) {
            return null;
        }
        return ImmutableSet.builder().addAll(Arrays.asList(StringSplitter.split((String)tableListStr, (String)","))).build();
    }

    public static void main(String[] args) {
        SnapshotBuildJob snapshotBuildJob = new SnapshotBuildJob();
        snapshotBuildJob.execute(args);
    }

    @Override
    protected void doExecute() throws Exception {
        String jobStepId = StringUtils.replace((String)this.infos.getJobStepId(), (String)"job_step_", (String)"");
        SnapshotExec exec = new SnapshotExec(jobStepId);
        StageType.SNAPSHOT_BUILD.createStage(this, null, null, exec);
        exec.buildSnapshot();
    }

    public void buildSnapshot() throws IOException {
        String tableName = this.getParam("table");
        String selectedPartCol = this.getParam("selectedPartitionCol");
        TableDesc tableDesc = NTableMetadataManager.getInstance((KylinConfig)this.config, (String)this.project).getTableDesc(tableName);
        boolean incrementalBuild = "true".equals(this.getParam("incrementalBuild"));
        String partitionToBuildString = this.getParam("selectedPartition");
        Set partitionToBuild = null;
        if (partitionToBuildString != null) {
            partitionToBuild = JsonUtil.readValueAsSet((String)partitionToBuildString);
        }
        if (selectedPartCol == null) {
            new SnapshotBuilder().buildSnapshot(this.ss, Sets.newHashSet((Object[])new TableDesc[]{tableDesc}));
        } else {
            this.initialize(tableDesc, selectedPartCol, incrementalBuild, partitionToBuild);
            tableDesc = NTableMetadataManager.getInstance((KylinConfig)this.config, (String)this.project).getTableDesc(tableName);
            if (partitionToBuild == null) {
                partitionToBuild = tableDesc.getNotReadyPartitions();
            }
            logger.info("{} need build partitions: {}", (Object)tableDesc.getIdentity(), (Object)partitionToBuild);
            new SnapshotPartitionBuilder().buildSnapshot(this.ss, tableDesc, selectedPartCol, partitionToBuild);
            if (incrementalBuild) {
                this.moveIncrementalPartitions(tableDesc.getLastSnapshotPath(), tableDesc.getTempSnapshotPath());
            }
        }
    }

    private void initialize(TableDesc table, String selectedPartCol, boolean incrementBuild, Set<String> partitionToBuild) {
        if (table.getTempSnapshotPath() != null) {
            logger.info("snapshot partition has been initialed, so skip.");
            return;
        }
        Set<String> partitions = this.getTablePartitions(table, selectedPartCol);
        Set curPartitions = table.getSnapshotPartitions().keySet();
        String resourcePath = FileNames.snapshotFile(table) + "/" + RandomUtil.randomUUID();
        UnitOfWork.doInTransactionWithRetry(() -> {
            NTableMetadataManager tableMetadataManager = NTableMetadataManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)this.project);
            TableDesc copy = tableMetadataManager.copyForWrite(table);
            if (incrementBuild) {
                if (partitionToBuild == null) {
                    copy.addSnapshotPartitions((Set)Sets.difference((Set)partitions, (Set)curPartitions));
                } else {
                    copy.addSnapshotPartitions(partitionToBuild);
                }
            } else {
                copy.resetSnapshotPartitions(partitions);
                copy.setSnapshotTotalRows(0L);
                TableExtDesc copyExt = tableMetadataManager.copyForWrite(tableMetadataManager.getOrCreateTableExt(table));
                copyExt.setTotalRows(0L);
                tableMetadataManager.saveTableExt(copyExt);
            }
            copy.setTempSnapshotPath(resourcePath);
            tableMetadataManager.updateTableDesc(copy);
            return null;
        }, (String)this.project);
    }

    protected Set<String> getTablePartitions(TableDesc tableDesc, String selectPartitionCol) {
        if (KylinConfig.getInstanceFromEnv().isUTEnv()) {
            return SnapshotBuildJob.toPartitions(this.getParam("partitions"));
        }
        if (tableDesc.isRangePartition() && tableDesc.getPartitionColumn().equalsIgnoreCase(selectPartitionCol)) {
            logger.info("The\u3010{}\u3011column is range partition table,so return partition column.", (Object)tableDesc.getName());
            return SnapshotBuildJob.toPartitions(tableDesc.getPartitionColumn());
        }
        ISourceMetadataExplorer explr = SourceFactory.getSource((ISourceAware)tableDesc).getSourceMetadataExplorer();
        Set curPartitions = explr.getTablePartitions(tableDesc.getDatabase(), tableDesc.getName(), tableDesc.getProject(), selectPartitionCol);
        logger.info("{} current partitions: {}", (Object)tableDesc.getIdentity(), (Object)curPartitions);
        return curPartitions;
    }

    private void moveIncrementalPartitions(String originSnapshotPath, String incrementalSnapshotPath) {
        String target = this.getSnapshotDir(originSnapshotPath);
        Path sourcePath = new Path(this.getSnapshotDir(incrementalSnapshotPath));
        FileSystem fs = HadoopUtil.getWorkingFileSystem();
        try {
            if (!fs.exists(sourcePath)) {
                return;
            }
            for (FileStatus fileStatus : fs.listStatus(sourcePath)) {
                String targetFilePathString = target + "/" + fileStatus.getPath().getName();
                Path targetFilePath = new Path(targetFilePathString);
                if (fs.exists(targetFilePath)) {
                    logger.info(String.format(Locale.ROOT, "delete non-effective partition %s ", targetFilePath));
                    fs.delete(targetFilePath, true);
                }
                if (StringUtils.equalsIgnoreCase((String)fs.getScheme(), (String)"s3a") && fs.isDirectory(fileStatus.getPath())) {
                    fs.mkdirs(targetFilePath);
                    this.renameS3A(fs, fileStatus, targetFilePath);
                    continue;
                }
                fs.rename(fileStatus.getPath(), new Path(target));
            }
            fs.delete(sourcePath, true);
        }
        catch (Exception e) {
            logger.error(String.format(Locale.ROOT, "from %s to %s move file fail:", incrementalSnapshotPath, originSnapshotPath), (Throwable)e);
            Throwables.propagate((Throwable)e);
        }
    }

    private void renameS3A(FileSystem fs, FileStatus source, Path target) throws IOException {
        for (FileStatus sourceInner : fs.listStatus(source.getPath())) {
            if (!fs.exists(sourceInner.getPath())) continue;
            if (sourceInner.isFile()) {
                fs.rename(sourceInner.getPath(), target);
            }
            if (!sourceInner.isDirectory()) continue;
            String targetInnerString = target + "/" + sourceInner.getPath().getName();
            Path targetInner = new Path(targetInnerString);
            if (fs.exists(targetInner)) {
                logger.info(String.format(Locale.ROOT, "delete non-effective partition %s ", targetInnerString));
                fs.delete(targetInner, true);
            }
            fs.mkdirs(targetInner);
            this.renameS3A(fs, sourceInner, targetInner);
        }
    }

    private String getSnapshotDir(String snapshotPath) {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        String workingDir = KapConfig.wrap((KylinConfig)config).getMetadataWorkingDirectory();
        return workingDir + "/" + snapshotPath;
    }

    @Override
    protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
        Map snapshotConfig = config.getSnapshotBuildingConfigOverride();
        Map generalBuildConfig = config.getSparkConfigOverride();
        generalBuildConfig.putAll(snapshotConfig);
        return generalBuildConfig;
    }

    @Override
    protected void chooseContentSize(SparkConfHelper helper) {
    }
}

