/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.com.beust.jcommander.IValueValidator;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.com.beust.jcommander.ParameterException;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.util.JavaScalaConverters;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.SqlTransformerConfig;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieSnapshotExporter {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieSnapshotExporter.class);

    public void export(JavaSparkContext jsc, Config cfg) throws IOException {
        FileSystem outputFs = HadoopFSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
        if (outputFs.exists(new Path(cfg.targetOutputPath))) {
            throw new HoodieSnapshotExporterException("The target output path already exists.");
        }
        FileSystem sourceFs = HadoopFSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
        HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(sourceFs.getConf())).setBasePath(cfg.sourceBasePath).build();
        String latestCommitTimestamp = this.getLatestCommitTimestamp(tableMetadata).orElseThrow(() -> {
            throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
        });
        LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
        HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
        List<String> partitions = this.getPartitions(engineContext, cfg, new HoodieHadoopStorage(sourceFs));
        if (partitions.isEmpty()) {
            throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot.");
        }
        LOG.info(String.format("The job needs to export %d partitions.", partitions.size()));
        if (cfg.outputFormat.equals("hudi")) {
            this.exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp, tableMetadata);
        } else {
            this.exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
        }
        this.createSuccessTag(outputFs, cfg);
    }

    private Option<String> getLatestCommitTimestamp(HoodieTableMetaClient tableMetadata) {
        Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
        return latestCommit.isPresent() ? Option.of(latestCommit.get().requestedTime()) : Option.empty();
    }

    private List<String> getPartitions(HoodieEngineContext engineContext, Config cfg, HoodieStorage storage2) {
        return FSUtils.getAllPartitionPaths(engineContext, storage2, cfg.sourceBasePath, true);
    }

    private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
        Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS");
        if (!fs.exists(successTagPath)) {
            LOG.info(String.format("Creating _SUCCESS under target output path: %s", cfg.targetOutputPath));
            fs.createNewFile(successTagPath);
        }
    }

    private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
        Option<Transformer> transformer;
        Partitioner defaultPartitioner = dataset -> {
            Dataset hoodieDroppedDataset = dataset.drop(JavaScalaConverters.convertJavaIteratorToScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
            return StringUtils.isNullOrEmpty(cfg.outputPartitionField) ? hoodieDroppedDataset.write() : hoodieDroppedDataset.repartition(new Column[]{new Column(cfg.outputPartitionField)}).write().partitionBy(new String[]{cfg.outputPartitionField});
        };
        Partitioner partitioner = StringUtils.isNullOrEmpty(cfg.outputPartitioner) ? defaultPartitioner : (Partitioner)ReflectionUtils.loadClass(cfg.outputPartitioner);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        ((HoodieEngineContext)context).setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
        TableFileSystemView.BaseFileOnlyView fsView = this.getBaseFileOnlyView(sourceFs, cfg);
        Iterator exportingFilePaths = jsc.parallelize(partitions, partitions.size()).flatMap((FlatMapFunction & Serializable)partition -> fsView.getLatestBaseFilesBeforeOrOn((String)partition, latestCommitTimestamp).map(BaseFile::getPath).iterator()).toLocalIterator();
        Dataset<Row> sourceDataset = new SQLContext(jsc).read().parquet(JavaScalaConverters.convertJavaIteratorToScalaIterator(exportingFilePaths).toSeq());
        if (!StringUtils.isNullOrEmpty(cfg.transformerClassName) && (transformer = UtilHelpers.createTransformer(Option.of(Collections.singletonList(cfg.transformerClassName)), Option::empty, false)).isPresent()) {
            TypedProperties transformerProps = new TypedProperties();
            transformerProps.setPropertyIfNonNull(SqlTransformerConfig.TRANSFORMER_SQL.key(), cfg.transformerSql);
            transformerProps.setPropertyIfNonNull(SqlTransformerConfig.TRANSFORMER_SQL_FILE.key(), cfg.transformerSqlFile);
            sourceDataset = transformer.get().apply(jsc, SparkSession.builder().getOrCreate(), sourceDataset, transformerProps);
        }
        partitioner.partition(sourceDataset).format(cfg.outputFormat).mode(SaveMode.ErrorIfExists).save(cfg.targetOutputPath);
    }

    private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs, Config cfg, List<String> partitions, String latestCommitTimestamp, HoodieTableMetaClient metaClient) throws IOException {
        int parallelism = cfg.parallelism == 0 ? jsc.defaultParallelism() : cfg.parallelism;
        TableFileSystemView.BaseFileOnlyView fsView = this.getBaseFileOnlyView(sourceFs, cfg);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        StorageConfiguration<?> storageConf = context.getStorageConf();
        ((HoodieEngineContext)context).setJobStatus(this.getClass().getSimpleName(), "Exporting as HUDI dataset");
        List partitionAndFileList = ((HoodieEngineContext)context).flatMap(partitions, partition -> {
            StoragePath partitionMetaFile;
            List filePaths = fsView.getLatestBaseFilesBeforeOrOn((String)partition, latestCommitTimestamp).map(f -> Pair.of(partition, f.getPath())).collect(Collectors.toList());
            HoodieStorage storage2 = HoodieStorageUtils.getStorage(cfg.sourceBasePath, storageConf);
            if (storage2.exists(partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(storage2, FSUtils.constructAbsolutePath(cfg.sourceBasePath, partition)).get())) {
                filePaths.add(Pair.of(partition, partitionMetaFile.toString()));
            }
            return filePaths.stream();
        }, parallelism);
        ((HoodieEngineContext)context).foreach(partitionAndFileList, partitionAndFile -> {
            String partition = (String)partitionAndFile.getLeft();
            Path sourceFilePath = new Path((String)partitionAndFile.getRight());
            Path toPartitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(cfg.targetOutputPath, partition);
            FileSystem executorSourceFs = HadoopFSUtils.getFs(cfg.sourceBasePath, storageConf.newInstance());
            FileSystem executorOutputFs = HadoopFSUtils.getFs(cfg.targetOutputPath, storageConf.newInstance());
            if (!executorOutputFs.exists(toPartitionPath)) {
                executorOutputFs.mkdirs(toPartitionPath);
            }
            FileUtil.copy((FileSystem)executorSourceFs, (Path)sourceFilePath, (FileSystem)executorOutputFs, (Path)new Path(toPartitionPath, sourceFilePath.getName()), (boolean)false, (boolean)true, (Configuration)executorOutputFs.getConf());
        }, parallelism);
        LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
        List commitFilesListToCopy = Arrays.stream(sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + ".hoodie" + "/" + "timeline"))).filter(fileStatus -> {
            Path path = fileStatus.getPath();
            if (path.getName().equals("hoodie.properties")) {
                return true;
            }
            if (fileStatus.isDirectory()) {
                return false;
            }
            String instantTime = metaClient.getInstantFileNameParser().extractTimestamp(path.getName());
            return InstantComparison.compareTimestamps(instantTime, InstantComparison.LESSER_THAN_OR_EQUALS, latestCommitTimestamp);
        }).collect(Collectors.toList());
        commitFilesListToCopy.addAll(Arrays.stream(sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + ".hoodie" + "/" + "hoodie.properties"))).collect(Collectors.toList()));
        FileStatus[] commitFilesToCopy = (FileStatus[])commitFilesListToCopy.stream().toArray(FileStatus[]::new);
        ((HoodieEngineContext)context).foreach(Arrays.asList(commitFilesToCopy), commitFile -> {
            Path targetFilePath = commitFile.getPath().getName().endsWith("hoodie.properties") ? new Path(cfg.targetOutputPath + "/" + ".hoodie" + "/" + commitFile.getPath().getName()) : new Path(cfg.targetOutputPath + "/" + ".hoodie" + "/" + "timeline" + "/" + commitFile.getPath().getName());
            FileSystem executorSourceFs = HadoopFSUtils.getFs(cfg.sourceBasePath, storageConf.unwrapCopyAs(Configuration.class));
            FileSystem executorOutputFs = HadoopFSUtils.getFs(cfg.targetOutputPath, storageConf.unwrapCopyAs(Configuration.class));
            if (!executorOutputFs.exists(targetFilePath.getParent())) {
                executorOutputFs.mkdirs(targetFilePath.getParent());
            }
            FileUtil.copy((FileSystem)executorSourceFs, (Path)commitFile.getPath(), (FileSystem)executorOutputFs, (Path)targetFilePath, (boolean)false, (boolean)true, (Configuration)executorOutputFs.getConf());
        }, parallelism);
    }

    private TableFileSystemView.BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) {
        HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(sourceFs.getConf())).setBasePath(cfg.sourceBasePath).build();
        return new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args2) throws IOException {
        Config cfg = new Config();
        new JCommander((Object)cfg, null, args2);
        if (!HoodieSnapshotExporter.areTransformerOptionsValid(cfg)) {
            System.exit(1);
        }
        SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-snapshot-exporter", "local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        LOG.info("Initializing spark job.");
        try {
            new HoodieSnapshotExporter().export(jsc, cfg);
        }
        finally {
            jsc.stop();
        }
    }

    public static boolean areTransformerOptionsValid(Config config) {
        boolean valid = true;
        if (!StringUtils.isNullOrEmpty(config.transformerClassName)) {
            switch (config.transformerClassName) {
                case "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer": {
                    if (!StringUtils.isNullOrEmpty(config.transformerSql)) break;
                    LOG.error("--transformer-sql is required when using SqlQueryBasedTransformer");
                    valid = false;
                    break;
                }
                case "org.apache.hudi.utilities.transform.SqlFileBasedTransformer": {
                    if (!StringUtils.isNullOrEmpty(config.transformerSqlFile)) break;
                    LOG.error("--transformer-sql-file is required when using SqlFileBasedTransformer");
                    valid = false;
                    break;
                }
            }
        }
        return valid;
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--source-base-path"}, description="Base path for the source Hudi dataset to be snapshotted", required=true)
        public String sourceBasePath;
        @Parameter(names={"--target-output-path"}, description="Base path for the target output files (snapshots)", required=true)
        public String targetOutputPath;
        @Parameter(names={"--output-format"}, description="Output format for the exported dataset; accept these values: json|parquet|orc|hudi", required=true, validateValueWith={OutputFormatValidator.class})
        public String outputFormat;
        @Parameter(names={"--output-partition-field"}, description="A field to be used by Spark repartitioning")
        public String outputPartitionField = null;
        @Parameter(names={"--output-partitioner"}, description="A class to facilitate custom repartitioning")
        public String outputPartitioner = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for file listing")
        public int parallelism = 0;
        @Parameter(names={"--transformer-class"}, description="A subclass of org.apache.hudi.utilities.transform.Transformer. Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before writing. Default: Not set. Available transformers: org.apache.hudi.utilities.transform.SqlQueryBasedTransformer, org.apache.hudi.utilities.transform.SqlFileBasedTransformer, org.apache.hudi.utilities.transform.FlatteningTransformer, org.apache.hudi.utilities.transform.AWSDmsTransformer.")
        public String transformerClassName = null;
        @Parameter(names={"--transformer-sql"}, description="sql-query template be used to transform the source before writing to Hudi data-set. The query should reference the source as a table named \"<SRC>\".")
        public String transformerSql = null;
        @Parameter(names={"--transformer-sql-file"}, description="File with a SQL query to be executed during write. The query should reference the source as a table named \"<SRC>\".")
        public String transformerSqlFile = null;
    }

    public static class OutputFormatValidator
    implements IValueValidator<String> {
        public static final String HUDI = "hudi";
        public static final List<String> FORMATS = CollectionUtils.createImmutableList("json", "parquet", "orc", "hudi");

        @Override
        public void validate(String name2, String value) {
            if (value == null || !FORMATS.contains(value)) {
                throw new ParameterException(String.format("Invalid output format: value:%s: supported formats:%s", value, FORMATS));
            }
        }
    }

    @FunctionalInterface
    public static interface Partitioner {
        public DataFrameWriter<Row> partition(Dataset<Row> var1);
    }
}

