/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.com.beust.jcommander.IValueValidator;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.com.beust.jcommander.ParameterException;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import scala.collection.JavaConversions;

public class HoodieSnapshotExporter {
    private static final Logger LOG = LogManager.getLogger(HoodieSnapshotExporter.class);

    public void export(JavaSparkContext jsc, Config cfg) throws IOException {
        FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
        if (outputFs.exists(new Path(cfg.targetOutputPath))) {
            throw new HoodieSnapshotExporterException("The target output path already exists.");
        }
        FileSystem sourceFs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
        String latestCommitTimestamp = this.getLatestCommitTimestamp(sourceFs, cfg).orElseThrow(() -> {
            throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
        });
        LOG.info((Object)String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
        HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
        List<String> partitions = this.getPartitions(engineContext, cfg);
        if (partitions.isEmpty()) {
            throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot.");
        }
        LOG.info((Object)String.format("The job needs to export %d partitions.", partitions.size()));
        if (cfg.outputFormat.equals("hudi")) {
            this.exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
        } else {
            this.exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
        }
        this.createSuccessTag(outputFs, cfg);
    }

    private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
        HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
        Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
        return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty();
    }

    private List<String> getPartitions(HoodieEngineContext engineContext, Config cfg) {
        return FSUtils.getAllPartitionPaths(engineContext, cfg.sourceBasePath, true, false);
    }

    private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
        Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS");
        if (!fs.exists(successTagPath)) {
            LOG.info((Object)String.format("Creating _SUCCESS under target output path: %s", cfg.targetOutputPath));
            fs.createNewFile(successTagPath);
        }
    }

    private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs, Config cfg, List<String> partitions, String latestCommitTimestamp) {
        Partitioner defaultPartitioner = dataset -> {
            Dataset hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
            return StringUtils.isNullOrEmpty(cfg.outputPartitionField) ? hoodieDroppedDataset.write() : hoodieDroppedDataset.repartition(new Column[]{new Column(cfg.outputPartitionField)}).write().partitionBy(new String[]{cfg.outputPartitionField});
        };
        Partitioner partitioner = StringUtils.isNullOrEmpty(cfg.outputPartitioner) ? defaultPartitioner : (Partitioner)ReflectionUtils.loadClass(cfg.outputPartitioner);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        ((HoodieEngineContext)context).setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
        TableFileSystemView.BaseFileOnlyView fsView = this.getBaseFileOnlyView(sourceFs, cfg);
        Iterator exportingFilePaths = jsc.parallelize(partitions, partitions.size()).flatMap((FlatMapFunction & Serializable)partition -> fsView.getLatestBaseFilesBeforeOrOn((String)partition, latestCommitTimestamp).map(BaseFile::getPath).iterator()).toLocalIterator();
        Dataset sourceDataset = new SQLContext(jsc).read().parquet(JavaConversions.asScalaIterator((Iterator)exportingFilePaths).toSeq());
        partitioner.partition((Dataset<Row>)sourceDataset).format(cfg.outputFormat).mode(SaveMode.ErrorIfExists).save(cfg.targetOutputPath);
    }

    private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
        int parallelism = cfg.parallelism == 0 ? jsc.defaultParallelism() : cfg.parallelism;
        TableFileSystemView.BaseFileOnlyView fsView = this.getBaseFileOnlyView(sourceFs, cfg);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        SerializableConfiguration serConf = context.getHadoopConf();
        ((HoodieEngineContext)context).setJobStatus(this.getClass().getSimpleName(), "Exporting as HUDI dataset");
        List partitionAndFileList = ((HoodieEngineContext)context).flatMap(partitions, partition -> {
            Path partitionMetaFile;
            List filePaths = fsView.getLatestBaseFilesBeforeOrOn((String)partition, latestCommitTimestamp).map(f -> Pair.of(partition, f.getPath())).collect(Collectors.toList());
            FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
            if (fs.exists(partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(fs, FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get())) {
                filePaths.add(Pair.of(partition, partitionMetaFile.toString()));
            }
            return filePaths.stream();
        }, parallelism);
        ((HoodieEngineContext)context).foreach(partitionAndFileList, partitionAndFile -> {
            String partition = (String)partitionAndFile.getLeft();
            Path sourceFilePath = new Path((String)partitionAndFile.getRight());
            Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition);
            FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
            FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
            if (!executorOutputFs.exists(toPartitionPath)) {
                executorOutputFs.mkdirs(toPartitionPath);
            }
            FileUtil.copy((FileSystem)executorSourceFs, (Path)sourceFilePath, (FileSystem)executorOutputFs, (Path)new Path(toPartitionPath, sourceFilePath.getName()), (boolean)false, (boolean)false, (Configuration)executorOutputFs.getConf());
        }, parallelism);
        LOG.info((Object)String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
        FileStatus[] commitFilesToCopy = sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + ".hoodie"), commitFilePath -> {
            if (commitFilePath.getName().equals("hoodie.properties")) {
                return true;
            }
            String instantTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
            return HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCommitTimestamp);
        });
        ((HoodieEngineContext)context).foreach(Arrays.asList(commitFilesToCopy), commitFile -> {
            Path targetFilePath = new Path(cfg.targetOutputPath + "/" + ".hoodie" + "/" + commitFile.getPath().getName());
            FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
            FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
            if (!executorOutputFs.exists(targetFilePath.getParent())) {
                executorOutputFs.mkdirs(targetFilePath.getParent());
            }
            FileUtil.copy((FileSystem)executorSourceFs, (Path)commitFile.getPath(), (FileSystem)executorOutputFs, (Path)targetFilePath, (boolean)false, (boolean)false, (Configuration)executorOutputFs.getConf());
        }, parallelism);
    }

    private TableFileSystemView.BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) {
        HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(sourceFs.getConf()).setBasePath(cfg.sourceBasePath).build();
        return new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args2) throws IOException {
        Config cfg = new Config();
        new JCommander((Object)cfg, null, args2);
        SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-snapshot-exporter", "local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        LOG.info((Object)"Initializing spark job.");
        try {
            new HoodieSnapshotExporter().export(jsc, cfg);
        }
        finally {
            jsc.stop();
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--source-base-path"}, description="Base path for the source Hudi dataset to be snapshotted", required=true)
        public String sourceBasePath;
        @Parameter(names={"--target-output-path"}, description="Base path for the target output files (snapshots)", required=true)
        public String targetOutputPath;
        @Parameter(names={"--output-format"}, description="Output format for the exported dataset; accept these values: json|parquet|orc|hudi", required=true, validateValueWith={OutputFormatValidator.class})
        public String outputFormat;
        @Parameter(names={"--output-partition-field"}, description="A field to be used by Spark repartitioning")
        public String outputPartitionField = null;
        @Parameter(names={"--output-partitioner"}, description="A class to facilitate custom repartitioning")
        public String outputPartitioner = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for file listing")
        public int parallelism = 0;
    }

    public static class OutputFormatValidator
    implements IValueValidator<String> {
        public static final String HUDI = "hudi";
        public static final List<String> FORMATS = CollectionUtils.createImmutableList("json", "parquet", "orc", "hudi");

        @Override
        public void validate(String name, String value) {
            if (value == null || !FORMATS.contains(value)) {
                throw new ParameterException(String.format("Invalid output format: value:%s: supported formats:%s", value, FORMATS));
            }
        }
    }

    @FunctionalInterface
    public static interface Partitioner {
        public DataFrameWriter<Row> partition(Dataset<Row> var1);
    }
}

