/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import com.beust.jcommander.IValueValidator;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.util.JavaScalaConverters;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieSnapshotExporter {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieSnapshotExporter.class);

    public void export(JavaSparkContext jsc, Config cfg) throws IOException {
        FileSystem outputFs = HadoopFSUtils.getFs((String)cfg.targetOutputPath, (Configuration)jsc.hadoopConfiguration());
        if (outputFs.exists(new Path(cfg.targetOutputPath))) {
            throw new HoodieSnapshotExporterException("The target output path already exists.");
        }
        FileSystem sourceFs = HadoopFSUtils.getFs((String)cfg.sourceBasePath, (Configuration)jsc.hadoopConfiguration());
        String latestCommitTimestamp = (String)this.getLatestCommitTimestamp(sourceFs, cfg).orElseThrow(() -> {
            throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
        });
        LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
        HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
        List<String> partitions = this.getPartitions((HoodieEngineContext)engineContext, cfg, (HoodieStorage)new HoodieHadoopStorage(sourceFs));
        if (partitions.isEmpty()) {
            throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot.");
        }
        LOG.info(String.format("The job needs to export %d partitions.", partitions.size()));
        if (cfg.outputFormat.equals("hudi")) {
            this.exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
        } else {
            this.exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
        }
        this.createSuccessTag(outputFs, cfg);
    }

    private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
        HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy((Configuration)fs.getConf())).setBasePath(cfg.sourceBasePath).build();
        Option latestCommit = tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
        return latestCommit.isPresent() ? Option.of((Object)((HoodieInstant)latestCommit.get()).getTimestamp()) : Option.empty();
    }

    private List<String> getPartitions(HoodieEngineContext engineContext, Config cfg, HoodieStorage storage) {
        return FSUtils.getAllPartitionPaths((HoodieEngineContext)engineContext, (HoodieStorage)storage, (String)cfg.sourceBasePath, (boolean)true, (boolean)false);
    }

    private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
        Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS");
        if (!fs.exists(successTagPath)) {
            LOG.info(String.format("Creating _SUCCESS under target output path: %s", cfg.targetOutputPath));
            fs.createNewFile(successTagPath);
        }
    }

    private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs, Config cfg, List<String> partitions, String latestCommitTimestamp) {
        Partitioner defaultPartitioner = dataset -> {
            Dataset hoodieDroppedDataset = dataset.drop(JavaScalaConverters.convertJavaIteratorToScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
            return StringUtils.isNullOrEmpty((String)cfg.outputPartitionField) ? hoodieDroppedDataset.write() : hoodieDroppedDataset.repartition(new Column[]{new Column(cfg.outputPartitionField)}).write().partitionBy(new String[]{cfg.outputPartitionField});
        };
        Partitioner partitioner = StringUtils.isNullOrEmpty((String)cfg.outputPartitioner) ? defaultPartitioner : (Partitioner)ReflectionUtils.loadClass((String)cfg.outputPartitioner);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
        TableFileSystemView.BaseFileOnlyView fsView = this.getBaseFileOnlyView(sourceFs, cfg);
        Iterator exportingFilePaths = jsc.parallelize(partitions, partitions.size()).flatMap((FlatMapFunction & Serializable)partition -> fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp).map(BaseFile::getPath).iterator()).toLocalIterator();
        Dataset sourceDataset = new SQLContext(jsc).read().parquet(JavaScalaConverters.convertJavaIteratorToScalaIterator((Iterator)exportingFilePaths).toSeq());
        partitioner.partition((Dataset<Row>)sourceDataset).format(cfg.outputFormat).mode(SaveMode.ErrorIfExists).save(cfg.targetOutputPath);
    }

    private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
        int parallelism = cfg.parallelism == 0 ? jsc.defaultParallelism() : cfg.parallelism;
        TableFileSystemView.BaseFileOnlyView fsView = this.getBaseFileOnlyView(sourceFs, cfg);
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        StorageConfiguration storageConf = context.getStorageConf();
        context.setJobStatus(this.getClass().getSimpleName(), "Exporting as HUDI dataset");
        List partitionAndFileList = context.flatMap(partitions, (SerializableFunction & Serializable)partition -> {
            StoragePath partitionMetaFile;
            List filePaths = fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp).map(f -> Pair.of((Object)partition, (Object)f.getPath())).collect(Collectors.toList());
            HoodieStorage storage = HoodieStorageUtils.getStorage((String)cfg.sourceBasePath, (StorageConfiguration)storageConf);
            if (storage.exists(partitionMetaFile = (StoragePath)HoodiePartitionMetadata.getPartitionMetafilePath((HoodieStorage)storage, (StoragePath)FSUtils.constructAbsolutePath((String)cfg.sourceBasePath, (String)partition)).get())) {
                filePaths.add(Pair.of((Object)partition, (Object)partitionMetaFile.toString()));
            }
            return filePaths.stream();
        }, parallelism);
        context.foreach(partitionAndFileList, (SerializableConsumer & Serializable)partitionAndFile -> {
            String partition = (String)partitionAndFile.getLeft();
            Path sourceFilePath = new Path((String)partitionAndFile.getRight());
            Path toPartitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath((String)cfg.targetOutputPath, (String)partition);
            FileSystem executorSourceFs = HadoopFSUtils.getFs((String)cfg.sourceBasePath, (StorageConfiguration)storageConf.newInstance());
            FileSystem executorOutputFs = HadoopFSUtils.getFs((String)cfg.targetOutputPath, (StorageConfiguration)storageConf.newInstance());
            if (!executorOutputFs.exists(toPartitionPath)) {
                executorOutputFs.mkdirs(toPartitionPath);
            }
            FileUtil.copy((FileSystem)executorSourceFs, (Path)sourceFilePath, (FileSystem)executorOutputFs, (Path)new Path(toPartitionPath, sourceFilePath.getName()), (boolean)false, (boolean)false, (Configuration)executorOutputFs.getConf());
        }, parallelism);
        LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
        FileStatus[] commitFilesToCopy = sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + ".hoodie"), commitFilePath -> {
            if (commitFilePath.getName().equals("hoodie.properties")) {
                return true;
            }
            String instantTime = FSUtils.getCommitFromCommitFile((String)commitFilePath.getName());
            return HoodieTimeline.compareTimestamps((String)instantTime, (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)latestCommitTimestamp);
        });
        context.foreach(Arrays.asList(commitFilesToCopy), (SerializableConsumer & Serializable)commitFile -> {
            Path targetFilePath = new Path(cfg.targetOutputPath + "/" + ".hoodie" + "/" + commitFile.getPath().getName());
            FileSystem executorSourceFs = HadoopFSUtils.getFs((String)cfg.sourceBasePath, (Configuration)((Configuration)storageConf.unwrapCopyAs(Configuration.class)));
            FileSystem executorOutputFs = HadoopFSUtils.getFs((String)cfg.targetOutputPath, (Configuration)((Configuration)storageConf.unwrapCopyAs(Configuration.class)));
            if (!executorOutputFs.exists(targetFilePath.getParent())) {
                executorOutputFs.mkdirs(targetFilePath.getParent());
            }
            FileUtil.copy((FileSystem)executorSourceFs, (Path)commitFile.getPath(), (FileSystem)executorOutputFs, (Path)targetFilePath, (boolean)false, (boolean)false, (Configuration)executorOutputFs.getConf());
        }, parallelism);
    }

    private TableFileSystemView.BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) {
        HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy((Configuration)sourceFs.getConf())).setBasePath(cfg.sourceBasePath).build();
        return new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws IOException {
        Config cfg = new Config();
        new JCommander((Object)cfg, null, args);
        SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-snapshot-exporter", "local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        LOG.info("Initializing spark job.");
        try {
            new HoodieSnapshotExporter().export(jsc, cfg);
        }
        finally {
            jsc.stop();
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--source-base-path"}, description="Base path for the source Hudi dataset to be snapshotted", required=true)
        public String sourceBasePath;
        @Parameter(names={"--target-output-path"}, description="Base path for the target output files (snapshots)", required=true)
        public String targetOutputPath;
        @Parameter(names={"--output-format"}, description="Output format for the exported dataset; accept these values: json|parquet|orc|hudi", required=true, validateValueWith={OutputFormatValidator.class})
        public String outputFormat;
        @Parameter(names={"--output-partition-field"}, description="A field to be used by Spark repartitioning")
        public String outputPartitionField = null;
        @Parameter(names={"--output-partitioner"}, description="A class to facilitate custom repartitioning")
        public String outputPartitioner = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for file listing")
        public int parallelism = 0;
    }

    public static class OutputFormatValidator
    implements IValueValidator<String> {
        public static final String HUDI = "hudi";
        public static final List<String> FORMATS = CollectionUtils.createImmutableList((Object[])new String[]{"json", "parquet", "orc", "hudi"});

        public void validate(String name, String value) {
            if (value == null || !FORMATS.contains(value)) {
                throw new ParameterException(String.format("Invalid output format: value:%s: supported formats:%s", value, FORMATS));
            }
        }
    }

    @FunctionalInterface
    public static interface Partitioner {
        public DataFrameWriter<Row> partition(Dataset<Row> var1);
    }
}

