/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.IOException;
import java.io.Serializable;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.repair.RepairUtils;
import org.apache.hudi.utilities.HoodieDataTableUtils;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

public class HoodieRepairTool {
    private static final Logger LOG = LogManager.getLogger(HoodieRepairTool.class);
    private static final String BACKUP_DIR_PREFIX = "hoodie_repair_backup_";
    private final Config cfg;
    private TypedProperties props;
    private final HoodieEngineContext context;
    private final HoodieTableMetaClient metaClient;
    private final HoodieTableMetadata tableMetadata;

    public HoodieRepairTool(JavaSparkContext jsc, Config cfg) {
        if (cfg.propsFilePath != null) {
            cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
        }
        this.context = new HoodieSparkEngineContext(jsc);
        this.cfg = cfg;
        this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
        this.metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
        this.tableMetadata = new FileSystemBackedTableMetadata(this.context, this.context.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning);
    }

    public boolean run() {
        Option<String> startingInstantOption = Option.ofNullable(this.cfg.startingInstantTime);
        Option<String> endingInstantOption = Option.ofNullable(this.cfg.endingInstantTime);
        if (startingInstantOption.isPresent() && endingInstantOption.isPresent()) {
            LOG.info((Object)String.format("Start repairing completed instants between %s and %s (inclusive)", startingInstantOption.get(), endingInstantOption.get()));
        } else if (startingInstantOption.isPresent()) {
            LOG.info((Object)String.format("Start repairing completed instants from %s (inclusive)", startingInstantOption.get()));
        } else if (endingInstantOption.isPresent()) {
            LOG.info((Object)String.format("Start repairing completed instants till %s (inclusive)", endingInstantOption.get()));
        } else {
            LOG.info((Object)"Start repairing all completed instants");
        }
        try {
            Mode mode = Mode.valueOf(this.cfg.runningMode.toUpperCase());
            switch (mode) {
                case REPAIR: {
                    LOG.info((Object)" ****** The repair tool is in REPAIR mode, dangling data and logs files not belonging to any commit are going to be DELETED from the table ******");
                    if (this.checkBackupPathForRepair() < 0) {
                        LOG.error((Object)"Backup path check failed.");
                        return false;
                    }
                    return this.doRepair(startingInstantOption, endingInstantOption, false);
                }
                case DRY_RUN: {
                    LOG.info((Object)" ****** The repair tool is in DRY_RUN mode, only LOOKING FOR dangling data and log files from the table ******");
                    return this.doRepair(startingInstantOption, endingInstantOption, true);
                }
                case UNDO: {
                    if (this.checkBackupPathAgainstBasePath() < 0) {
                        LOG.error((Object)"Backup path check failed.");
                        return false;
                    }
                    return this.undoRepair();
                }
            }
            LOG.info((Object)("Unsupported running mode [" + this.cfg.runningMode + "], quit the job directly"));
            return false;
        }
        catch (IOException e) {
            throw new HoodieIOException("Unable to repair table in " + this.cfg.basePath, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        JavaSparkContext jsc = UtilHelpers.buildSparkContext("hudi-table-repair", cfg.sparkMaster, cfg.sparkMemory);
        try {
            new HoodieRepairTool(jsc, cfg).run();
        }
        catch (Throwable throwable) {
            LOG.error((Object)("Fail to run table repair for " + cfg.basePath), throwable);
        }
        finally {
            jsc.stop();
        }
    }

    static boolean copyFiles(HoodieEngineContext context, List<String> relativeFilePaths, String sourceBasePath, String destBasePath) {
        SerializableConfiguration conf = context.getHadoopConf();
        List allResults = context.parallelize(relativeFilePaths).mapPartitions(iterator2 -> {
            ArrayList results = new ArrayList();
            FileSystem fs = FSUtils.getFs(destBasePath, conf.get());
            iterator2.forEachRemaining(filePath -> {
                boolean success = false;
                Path sourcePath = new Path(sourceBasePath, filePath);
                Path destPath = new Path(destBasePath, filePath);
                try {
                    if (!fs.exists(destPath)) {
                        FileIOUtils.copy(fs, sourcePath, destPath);
                        success = true;
                    }
                }
                catch (IOException e) {
                    LOG.error((Object)String.format("Copying file fails: source [%s], destination [%s]", sourcePath, destPath));
                }
                finally {
                    results.add(success);
                }
            });
            return results.iterator();
        }, true).collectAsList();
        return allResults.stream().reduce((r1, r2) -> r1 != false && r2 != false).orElse(false);
    }

    static List<String> listFilesFromBasePath(HoodieEngineContext context, String basePathStr, int expectedLevel, int parallelism) {
        FileSystem fs = FSUtils.getFs(basePathStr, context.getHadoopConf().get());
        Path basePath = new Path(basePathStr);
        return FSUtils.getFileStatusAtLevel(context, fs, basePath, expectedLevel, parallelism).stream().filter(fileStatus -> {
            if (!fileStatus.isFile()) {
                return false;
            }
            return FSUtils.isDataFile(fileStatus.getPath());
        }).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList());
    }

    static boolean deleteFiles(HoodieEngineContext context, String basePath, List<String> relativeFilePaths) {
        SerializableConfiguration conf = context.getHadoopConf();
        return context.parallelize(relativeFilePaths).mapPartitions(iterator2 -> {
            FileSystem fs = FSUtils.getFs(basePath, conf.get());
            ArrayList results = new ArrayList();
            iterator2.forEachRemaining(relativeFilePath -> {
                boolean success = false;
                try {
                    success = fs.delete(new Path(basePath, relativeFilePath), false);
                }
                catch (IOException e) {
                    LOG.warn((Object)("Failed to delete file " + relativeFilePath));
                }
                finally {
                    results.add(success);
                }
            });
            return results.iterator();
        }, true).collectAsList().stream().reduce((a, b) -> a != false && b != false).orElse(true);
    }

    boolean doRepair(Option<String> startingInstantOption, Option<String> endingInstantOption, boolean isDryRun) throws IOException {
        List<Path> allFilesInPartitions = HoodieDataTableUtils.getBaseAndLogFilePathsFromFileSystem(this.tableMetadata, this.cfg.basePath);
        Map<String, List<String>> instantToFilesMap = RepairUtils.tagInstantsOfBaseAndLogFiles(this.metaClient.getBasePath(), allFilesInPartitions);
        List<String> instantTimesToRepair = instantToFilesMap.keySet().stream().filter(instant -> !(startingInstantOption.isPresent() && instant.compareTo((String)startingInstantOption.get()) < 0 || endingInstantOption.isPresent() && instant.compareTo((String)endingInstantOption.get()) > 0)).collect(Collectors.toList());
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        HoodieArchivedTimeline archivedTimeline = this.metaClient.getArchivedTimeline();
        archivedTimeline.loadCompletedInstantDetailsInMemory();
        List<ImmutablePair> instantFilesToRemove = this.context.parallelize(instantTimesToRepair).map(instantToRepair -> new ImmutablePair<String, List<String>>((String)instantToRepair, RepairUtils.findInstantFilesToRemove(instantToRepair, (List)instantToFilesMap.get(instantToRepair), activeTimeline, archivedTimeline))).collectAsList();
        List<ImmutablePair<String, List<String>>> instantsWithDanglingFiles = instantFilesToRemove.stream().filter(e -> !((List)e.getValue()).isEmpty()).collect(Collectors.toList());
        this.printRepairInfo(instantTimesToRepair, instantsWithDanglingFiles);
        if (!isDryRun) {
            List<String> relativeFilePathsToDelete = instantsWithDanglingFiles.stream().flatMap(e -> ((List)e.getValue()).stream()).collect(Collectors.toList());
            if (relativeFilePathsToDelete.size() > 0) {
                if (!this.backupFiles(relativeFilePathsToDelete)) {
                    LOG.error((Object)"Error backing up dangling files. Exiting...");
                    return false;
                }
                return HoodieRepairTool.deleteFiles(this.context, this.cfg.basePath, relativeFilePathsToDelete);
            }
            LOG.info((Object)String.format("Table repair on %s is successful", this.cfg.basePath));
        }
        return true;
    }

    boolean undoRepair() throws IOException {
        String backupPathStr;
        Path backupPath;
        HoodieWrapperFileSystem fs = this.metaClient.getFs();
        if (!fs.exists(backupPath = new Path(backupPathStr = this.cfg.backupPath))) {
            LOG.error((Object)("Cannot find backup path: " + backupPath));
            return false;
        }
        List<String> allPartitionPaths = this.tableMetadata.getAllPartitionPaths();
        if (allPartitionPaths.isEmpty()) {
            LOG.error((Object)"Cannot get one partition path since there is no partition available");
            return false;
        }
        int partitionLevels = this.getExpectedLevelBasedOnPartitionPath(allPartitionPaths.get(0));
        List<String> relativeFilePaths = HoodieRepairTool.listFilesFromBasePath(this.context, backupPathStr, partitionLevels, this.cfg.parallelism).stream().map(filePath -> FSUtils.getRelativePartitionPath(new Path(backupPathStr), new Path(filePath))).collect(Collectors.toList());
        return this.restoreFiles(relativeFilePaths);
    }

    int getExpectedLevelBasedOnPartitionPath(String partitionPath) {
        if (StringUtils.isNullOrEmpty(partitionPath)) {
            return 0;
        }
        String[] partitionParts = partitionPath.split("/");
        return partitionParts.length;
    }

    int checkBackupPathForRepair() throws IOException {
        if (this.cfg.backupPath == null) {
            SecureRandom random = new SecureRandom();
            long randomLong = random.nextLong();
            this.cfg.backupPath = "/tmp/hoodie_repair_backup_" + randomLong;
        }
        Path backupPath = new Path(this.cfg.backupPath);
        if (this.metaClient.getFs().exists(backupPath) && this.metaClient.getFs().listStatus(backupPath).length > 0) {
            LOG.error((Object)String.format("Cannot use backup path %s: it is not empty", this.cfg.backupPath));
            return -1;
        }
        return this.checkBackupPathAgainstBasePath();
    }

    int checkBackupPathAgainstBasePath() {
        if (this.cfg.backupPath == null) {
            LOG.error((Object)"Backup path is not configured");
            return -1;
        }
        if (this.cfg.backupPath.contains(this.cfg.basePath)) {
            LOG.error((Object)String.format("Cannot use backup path %s: it resides in the base path %s", this.cfg.backupPath, this.cfg.basePath));
            return -1;
        }
        return 0;
    }

    boolean backupFiles(List<String> relativeFilePaths) {
        return HoodieRepairTool.copyFiles(this.context, relativeFilePaths, this.cfg.basePath, this.cfg.backupPath);
    }

    boolean restoreFiles(List<String> relativeFilePaths) {
        return HoodieRepairTool.copyFiles(this.context, relativeFilePaths, this.cfg.backupPath, this.cfg.basePath);
    }

    private void printRepairInfo(List<String> instantTimesToRepair, List<ImmutablePair<String, List<String>>> instantsWithDanglingFiles) {
        int numInstantsToRepair = instantsWithDanglingFiles.size();
        LOG.warn((Object)("Number of instants verified based on the base and log files: " + instantTimesToRepair.size()));
        LOG.warn((Object)("Instant timestamps: " + instantTimesToRepair));
        LOG.warn((Object)("Number of instants to repair: " + numInstantsToRepair));
        if (numInstantsToRepair > 0) {
            instantsWithDanglingFiles.forEach(e -> LOG.warn((Object)("   ** Removing files: " + e.getValue())));
        }
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
    }

    public static enum Mode {
        REPAIR,
        DRY_RUN,
        UNDO;

    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--mode", "-m"}, description="Set job mode: Set \"repair\" means repairing the table by removing dangling data and log files not belonging to any commit; Set \"dry_run\" means only looking for dangling data and log files; Set \"undo\" means undoing the repair by copying back the files from backup directory", required=true)
        public String runningMode = null;
        @Parameter(names={"--start-instant-time", "-si"}, description="Starting Instant time for repair (inclusive)", required=false)
        public String startingInstantTime = null;
        @Parameter(names={"--end-instant-time", "-ei"}, description="Ending Instant time for repair (inclusive)", required=false)
        public String endingInstantTime = null;
        @Parameter(names={"--backup-path", "-bp"}, description="Backup path for storing dangling data and log files from the table", required=false)
        public String backupPath = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for repair", required=false)
        public int parallelism = 2;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master", required=false)
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=false)
        public String sparkMemory = "1g";
        @Parameter(names={"--assume-date-partitioning", "-dp"}, description="whether the partition path is date with three levels", required=false)
        public Boolean assumeDatePartitioning = false;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client for table repair")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
    }
}

