/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.SerializableConfiguration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

public class HoodieSnapshotCopier
implements Serializable {
    private static Logger logger = LogManager.getLogger(HoodieSnapshotCopier.class);

    public void snapshot(JavaSparkContext jsc, String baseDir, String outputDir, boolean shouldAssumeDatePartitioning) throws IOException {
        FileSystem fs = FSUtils.getFs((String)baseDir, (Configuration)jsc.hadoopConfiguration());
        SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
        HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), baseDir);
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
        Option latestCommit = tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
        if (!latestCommit.isPresent()) {
            logger.warn((Object)"No commits present. Nothing to snapshot");
            return;
        }
        String latestCommitTimestamp = ((HoodieInstant)latestCommit.get()).getTimestamp();
        logger.info((Object)String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
        List partitions = FSUtils.getAllPartitionPaths((FileSystem)fs, (String)baseDir, (boolean)shouldAssumeDatePartitioning);
        if (partitions.size() > 0) {
            FileStatus[] commitFilesToCopy;
            logger.info((Object)String.format("The job needs to copy %d partitions.", partitions.size()));
            Path outputPath = new Path(outputDir);
            if (fs.exists(outputPath)) {
                logger.warn((Object)String.format("The output path %s targetBasePath already exists, deleting", outputPath));
                fs.delete(new Path(outputDir), true);
            }
            jsc.parallelize(partitions, partitions.size()).flatMap(arg_0 -> HoodieSnapshotCopier.lambda$snapshot$71904476$1(baseDir, serConf, (TableFileSystemView.ReadOptimizedView)fsView, latestCommitTimestamp, arg_0)).foreach((VoidFunction & Serializable)tuple -> {
                String partition = (String)tuple._1();
                Path sourceFilePath = new Path((String)tuple._2());
                Path toPartitionPath = new Path(outputDir, partition);
                FileSystem ifs = FSUtils.getFs((String)baseDir, (Configuration)serConf.newCopy());
                if (!ifs.exists(toPartitionPath)) {
                    ifs.mkdirs(toPartitionPath);
                }
                FileUtil.copy((FileSystem)ifs, (Path)sourceFilePath, (FileSystem)ifs, (Path)new Path(toPartitionPath, sourceFilePath.getName()), (boolean)false, (Configuration)ifs.getConf());
            });
            logger.info((Object)String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
            for (FileStatus commitStatus : commitFilesToCopy = fs.listStatus(new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), commitFilePath -> {
                if (commitFilePath.getName().equals("hoodie.properties")) {
                    return true;
                }
                String commitTime = FSUtils.getCommitFromCommitFile((String)commitFilePath.getName());
                return HoodieTimeline.compareTimestamps((String)commitTime, (String)latestCommitTimestamp, (BiPredicate)HoodieTimeline.LESSER_OR_EQUAL);
            })) {
                Path targetFilePath = new Path(outputDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
                if (!fs.exists(targetFilePath.getParent())) {
                    fs.mkdirs(targetFilePath.getParent());
                }
                if (fs.exists(targetFilePath)) {
                    logger.error((Object)String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
                }
                FileUtil.copy((FileSystem)fs, (Path)commitStatus.getPath(), (FileSystem)fs, (Path)targetFilePath, (boolean)false, (Configuration)fs.getConf());
            }
        } else {
            logger.info((Object)"The job has 0 partition to copy.");
        }
        Path successTagPath = new Path(outputDir + "/_SUCCESS");
        if (!fs.exists(successTagPath)) {
            logger.info((Object)String.format("Creating _SUCCESS under targetBasePath: $s", outputDir));
            fs.createNewFile(successTagPath);
        }
    }

    public static void main(String[] args) throws IOException {
        Config cfg = new Config();
        new JCommander((Object)cfg, args);
        logger.info((Object)String.format("Snapshot hoodie table from %s targetBasePath to %stargetBasePath", cfg.basePath, cfg.outputPath));
        SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-copier");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        logger.info((Object)"Initializing spark job.");
        HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
        copier.snapshot(jsc, cfg.basePath, cfg.outputPath, cfg.shouldAssumeDatePartitioning);
        jsc.stop();
    }

    private static /* synthetic */ Iterator lambda$snapshot$71904476$1(String baseDir, SerializableConfiguration serConf, TableFileSystemView.ReadOptimizedView fsView, String latestCommitTimestamp, String partition) throws Exception {
        FileSystem fs1 = FSUtils.getFs((String)baseDir, (Configuration)serConf.newCopy());
        ArrayList<Tuple2> filePaths = new ArrayList<Tuple2>();
        Stream dataFiles = fsView.getLatestDataFilesBeforeOrOn(partition, latestCommitTimestamp);
        dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2((Object)partition, (Object)hoodieDataFile.getPath())));
        Path partitionMetaFile = new Path(new Path(baseDir, partition), ".hoodie_partition_metadata");
        if (fs1.exists(partitionMetaFile)) {
            filePaths.add(new Tuple2((Object)partition, (Object)partitionMetaFile.toString()));
        }
        return filePaths.iterator();
    }

    static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-bp"}, description="Hoodie table base path", required=true)
        String basePath = null;
        @Parameter(names={"--output-path", "-op"}, description="The snapshot output path", required=true)
        String outputPath = null;
        @Parameter(names={"--date-partitioned", "-dp"}, description="Can we assume date partitioning?")
        boolean shouldAssumeDatePartitioning = false;

        Config() {
        }
    }
}

