/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.table.repair.RepairUtils;
import org.apache.hudi.utilities.HoodieDataTableUtils;
import org.apache.hudi.utilities.HoodieMetadataTableValidator;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

public class HoodieDataTableValidator
implements Serializable {
    private static final Logger LOG = LogManager.getLogger(HoodieDataTableValidator.class);
    private transient JavaSparkContext jsc;
    private Config cfg;
    private TypedProperties props;
    private HoodieTableMetaClient metaClient;
    protected transient Option<AsyncDataTableValidateService> asyncDataTableValidateService;

    public HoodieDataTableValidator(HoodieTableMetaClient metaClient) {
        this.metaClient = metaClient;
    }

    public HoodieDataTableValidator(JavaSparkContext jsc, Config cfg) {
        this.jsc = jsc;
        this.cfg = cfg;
        this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
        this.metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
        this.asyncDataTableValidateService = cfg.continuous ? Option.of(new AsyncDataTableValidateService()) : Option.empty();
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-Data-Table-Validator", cfg.sparkMaster);
        sparkConf.set("spark.executor.memory", cfg.sparkMemory);
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        HoodieDataTableValidator validator = new HoodieDataTableValidator(jsc, cfg);
        try {
            validator.run();
        }
        catch (Throwable throwable) {
            LOG.error((Object)("Fail to do hoodie Data table validation for " + validator.cfg), throwable);
        }
        finally {
            jsc.stop();
        }
    }

    public void run() {
        try {
            LOG.info((Object)this.cfg);
            if (this.cfg.continuous) {
                LOG.info((Object)" ****** do hoodie data table validation in CONTINUOUS mode ******");
                this.doHoodieDataTableValidationContinuous();
            } else {
                LOG.info((Object)" ****** do hoodie data table validation once ******");
                this.doHoodieDataTableValidationOnce();
            }
        }
        catch (Exception e) {
            throw new HoodieException("Unable to do hoodie data table validation in " + this.cfg.basePath, e);
        }
        finally {
            if (this.asyncDataTableValidateService.isPresent()) {
                this.asyncDataTableValidateService.get().shutdown(true);
            }
        }
    }

    private void doHoodieDataTableValidationOnce() {
        block2: {
            try {
                this.doDataTableValidation();
            }
            catch (HoodieValidationException e) {
                LOG.error((Object)"Metadata table validation failed to HoodieValidationException", (Throwable)e);
                if (this.cfg.ignoreFailed) break block2;
                throw e;
            }
        }
    }

    private void doHoodieDataTableValidationContinuous() {
        this.asyncDataTableValidateService.ifPresent(service2 -> {
            service2.start(null);
            try {
                service2.waitForShutdown();
            }
            catch (Exception e) {
                throw new HoodieException(e.getMessage(), e);
            }
        });
    }

    public void doDataTableValidation() {
        boolean finalResult;
        block9: {
            finalResult = true;
            this.metaClient.reloadActiveTimeline();
            String basePath = this.metaClient.getBasePath();
            HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(this.jsc);
            try {
                FileSystemBackedTableMetadata tableMetadata = new FileSystemBackedTableMetadata(engineContext, engineContext.getHadoopConf(), this.cfg.basePath, this.cfg.assumeDatePartitioning);
                List<Path> allDataFilePaths = HoodieDataTableUtils.getBaseAndLogFilePathsFromFileSystem(tableMetadata, this.cfg.basePath);
                if (this.metaClient.getActiveTimeline().firstInstant().isPresent()) {
                    String earliestInstant = this.metaClient.getActiveTimeline().firstInstant().get().getTimestamp();
                    List<Path> danglingFilePaths = allDataFilePaths.stream().filter(path -> {
                        String instantTime = FSUtils.getCommitTime(path.getName());
                        return HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN, earliestInstant);
                    }).collect(Collectors.toList());
                    if (!danglingFilePaths.isEmpty() && danglingFilePaths.size() > 0) {
                        LOG.error((Object)("Data table validation failed due to dangling files count " + danglingFilePaths.size() + ", found before active timeline"));
                        danglingFilePaths.forEach(entry -> LOG.error((Object)("Dangling file: " + entry.toString())));
                        finalResult = false;
                        if (!this.cfg.ignoreFailed) {
                            throw new HoodieValidationException("Data table validation failed due to dangling files " + danglingFilePaths.size());
                        }
                    }
                    Map<String, List<String>> instantToFilesMap = RepairUtils.tagInstantsOfBaseAndLogFiles(this.metaClient.getBasePath(), allDataFilePaths);
                    HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
                    List hoodieInstants = activeTimeline.filterCompletedInstants().getInstants().collect(Collectors.toList());
                    List<String> danglingFiles = engineContext.flatMap(hoodieInstants, instant -> {
                        List baseAndLogFilesFromFs;
                        Option<Set<String>> filesFromTimeline = RepairUtils.getBaseAndLogFilePathsFromTimeline(activeTimeline, instant);
                        List list = baseAndLogFilesFromFs = instantToFilesMap.containsKey(instant.getTimestamp()) ? (List)instantToFilesMap.get(instant.getTimestamp()) : Collections.emptyList();
                        if (!baseAndLogFilesFromFs.isEmpty()) {
                            HashSet danglingInstantFiles = new HashSet(baseAndLogFilesFromFs);
                            if (filesFromTimeline.isPresent()) {
                                danglingInstantFiles.removeAll((Collection)filesFromTimeline.get());
                            }
                            return new ArrayList(danglingInstantFiles).stream();
                        }
                        return Stream.empty();
                    }, hoodieInstants.size()).stream().collect(Collectors.toList());
                    if (!danglingFiles.isEmpty()) {
                        LOG.error((Object)("Data table validation failed due to extra files found for completed commits " + danglingFiles.size()));
                        danglingFiles.forEach(entry -> LOG.error((Object)("Dangling file: " + entry.toString())));
                        finalResult = false;
                        if (!this.cfg.ignoreFailed) {
                            throw new HoodieValidationException("Data table validation failed due to dangling files " + danglingFiles.size());
                        }
                    }
                }
            }
            catch (Exception e) {
                LOG.error((Object)("Data table validation failed due to " + e.getMessage()), (Throwable)e);
                if (this.cfg.ignoreFailed) break block9;
                throw new HoodieValidationException("Data table validation failed due to " + e.getMessage(), e);
            }
        }
        if (finalResult) {
            LOG.info((Object)"Data table validation succeeded.");
        } else {
            LOG.warn((Object)"Data table validation failed.");
        }
    }

    public class AsyncDataTableValidateService
    extends HoodieAsyncService {
        private final transient ExecutorService executor = Executors.newSingleThreadExecutor();

        @Override
        protected Pair<CompletableFuture, ExecutorService> startService() {
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                while (true) {
                    try {
                        while (true) {
                            long start2 = System.currentTimeMillis();
                            HoodieDataTableValidator.this.doDataTableValidation();
                            long toSleepMs = (long)(((HoodieDataTableValidator)HoodieDataTableValidator.this).cfg.minValidateIntervalSeconds * 1000) - (System.currentTimeMillis() - start2);
                            if (toSleepMs <= 0L) continue;
                            LOG.info((Object)("Last validate ran less than min validate interval: " + ((HoodieDataTableValidator)HoodieDataTableValidator.this).cfg.minValidateIntervalSeconds + " s, sleep: " + toSleepMs + " ms."));
                            Thread.sleep(toSleepMs);
                        }
                    }
                    catch (HoodieValidationException e) {
                        LOG.error((Object)"Shutting down AsyncDataTableValidateService due to HoodieValidationException", (Throwable)e);
                        if (((HoodieDataTableValidator)HoodieDataTableValidator.this).cfg.ignoreFailed) continue;
                        throw e;
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
            }, this.executor), this.executor);
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--continuous"}, description="Running MetadataTableValidator in continuous. Can use --min-validate-interval-seconds to control validation frequency", required=false)
        public boolean continuous = false;
        @Parameter(names={"--min-validate-interval-seconds"}, description="the min validate interval of each validate when set --continuous, default is 10 minutes.")
        public Integer minValidateIntervalSeconds = 600;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for validation", required=false)
        public int parallelism = 200;
        @Parameter(names={"--ignore-failed", "-ig"}, description="Ignore data table validate failure and continue.", required=false)
        public boolean ignoreFailed = false;
        @Parameter(names={"--assume-date-partitioning"}, description="Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path.This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually", required=false)
        public Boolean assumeDatePartitioning = false;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master", required=false)
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=false)
        public String sparkMemory = "1g";
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;

        public String toString() {
            return "MetadataTableValidatorConfig {\n   --base-path " + this.basePath + ", \n   --continuous " + this.continuous + ", \n   --ignore-failed " + this.ignoreFailed + ", \n   --min-validate-interval-seconds " + this.minValidateIntervalSeconds + ", \n   --parallelism " + this.parallelism + ", \n   --spark-master " + this.sparkMaster + ", \n   --spark-memory " + this.sparkMemory + ", \n   --assumeDatePartitioning-memory " + this.assumeDatePartitioning + ", \n   --props " + this.propsFilePath + ", \n   --hoodie-conf " + this.configs + "\n}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            HoodieMetadataTableValidator.Config config = (HoodieMetadataTableValidator.Config)o;
            return this.basePath.equals(config.basePath) && Objects.equals(this.continuous, config.continuous) && Objects.equals(this.minValidateIntervalSeconds, config.minValidateIntervalSeconds) && Objects.equals(this.parallelism, config.parallelism) && Objects.equals(this.ignoreFailed, config.ignoreFailed) && Objects.equals(this.sparkMaster, config.sparkMaster) && Objects.equals(this.sparkMemory, config.sparkMemory) && Objects.equals(this.assumeDatePartitioning, config.assumeDatePartitioning) && Objects.equals(this.propsFilePath, config.propsFilePath) && Objects.equals(this.configs, config.configs);
        }

        public int hashCode() {
            return Objects.hash(this.basePath, this.continuous, this.minValidateIntervalSeconds, this.parallelism, this.ignoreFailed, this.sparkMaster, this.sparkMemory, this.assumeDatePartitioning, this.propsFilePath, this.configs, this.help);
        }
    }
}

