/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jline.internal.Log;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.util.BloomFilterData;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

public class HoodieMetadataTableValidator
implements Serializable {
    private static final Logger LOG = LogManager.getLogger(HoodieMetadataTableValidator.class);
    private transient JavaSparkContext jsc;
    private Config cfg;
    private TypedProperties props;
    private HoodieTableMetaClient metaClient;
    protected transient Option<AsyncMetadataTableValidateService> asyncMetadataTableValidateService;
    private final String taskLabels;

    public HoodieMetadataTableValidator(HoodieTableMetaClient metaClient) {
        this.metaClient = metaClient;
        this.taskLabels = "";
    }

    public HoodieMetadataTableValidator(JavaSparkContext jsc, Config cfg) {
        this.jsc = jsc;
        this.cfg = cfg;
        this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
        this.metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
        this.asyncMetadataTableValidateService = cfg.continuous ? Option.of(new AsyncMetadataTableValidateService()) : Option.empty();
        this.taskLabels = this.generateValidationTaskLabels();
    }

    private String generateValidationTaskLabels() {
        ArrayList<String> labelList = new ArrayList<String>();
        if (this.cfg.validateLatestBaseFiles) {
            labelList.add("validate-latest-base-files");
        }
        if (this.cfg.validateLatestFileSlices) {
            labelList.add("validate-latest-file-slices");
        }
        if (this.cfg.validateAllFileGroups) {
            labelList.add("validate-all-file-groups");
        }
        if (this.cfg.validateAllColumnStats) {
            labelList.add("validate-all-column-stats");
        }
        if (this.cfg.validateBloomFilters) {
            labelList.add("validate-bloom-filters");
        }
        return String.join((CharSequence)",", labelList);
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-Metadata-Table-Validator", cfg.sparkMaster);
        sparkConf.set("spark.executor.memory", cfg.sparkMemory);
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        HoodieMetadataTableValidator validator = new HoodieMetadataTableValidator(jsc, cfg);
        try {
            validator.run();
        }
        catch (Throwable throwable) {
            LOG.error((Object)("Fail to do hoodie metadata table validation for " + validator.cfg), throwable);
        }
        finally {
            jsc.stop();
        }
    }

    public void run() {
        try {
            LOG.info((Object)this.cfg);
            if (this.cfg.continuous) {
                LOG.info((Object)" ****** do hoodie metadata table validation in CONTINUOUS mode ******");
                this.doHoodieMetadataTableValidationContinuous();
            } else {
                LOG.info((Object)" ****** do hoodie metadata table validation once ******");
                this.doHoodieMetadataTableValidationOnce();
            }
        }
        catch (Exception e) {
            throw new HoodieException("Unable to do hoodie metadata table validation in " + this.cfg.basePath, e);
        }
        finally {
            if (this.asyncMetadataTableValidateService.isPresent()) {
                this.asyncMetadataTableValidateService.get().shutdown(true);
            }
        }
    }

    private void doHoodieMetadataTableValidationOnce() {
        block2: {
            try {
                this.doMetadataTableValidation();
            }
            catch (HoodieValidationException e) {
                LOG.error((Object)"Metadata table validation failed to HoodieValidationException", (Throwable)e);
                if (this.cfg.ignoreFailed) break block2;
                throw e;
            }
        }
    }

    private void doHoodieMetadataTableValidationContinuous() {
        this.asyncMetadataTableValidateService.ifPresent(service2 -> {
            service2.start(null);
            try {
                service2.waitForShutdown();
            }
            catch (Exception e) {
                throw new HoodieException(e.getMessage(), e);
            }
        });
    }

    public void doMetadataTableValidation() {
        HoodieSparkEngineContext engineContext;
        List<String> allPartitions;
        boolean finalResult = true;
        this.metaClient.reloadActiveTimeline();
        String basePath = this.metaClient.getBasePath();
        Set baseFilesForCleaning = Collections.emptySet();
        if (!this.checkMetadataTableIsAvailable()) {
            return;
        }
        if (this.cfg.skipDataFilesForCleaning) {
            HoodieTimeline inflightCleaningTimeline = this.metaClient.getActiveTimeline().getCleanerTimeline().filterInflights();
            baseFilesForCleaning = inflightCleaningTimeline.getInstants().flatMap(instant -> {
                try {
                    instant = new HoodieInstant(HoodieInstant.State.REQUESTED, instant.getAction(), instant.getTimestamp());
                    HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(this.metaClient, instant);
                    return cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList -> cleanerFileInfoList.stream().map(fileInfo -> new Path(fileInfo.getFilePath()).getName()));
                }
                catch (IOException e) {
                    throw new HoodieIOException("Error reading cleaner metadata for " + instant);
                }
            }).filter(path -> {
                String fileExtension = FSUtils.getFileExtension(path);
                return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(fileExtension);
            }).collect(Collectors.toSet());
        }
        if ((allPartitions = this.validatePartitions(engineContext = new HoodieSparkEngineContext(this.jsc), basePath)).isEmpty()) {
            LOG.warn((Object)"The result of getting all partitions is null or empty, skip current validation.");
            return;
        }
        HoodieMetadataValidationContext metadataTableBasedContext = new HoodieMetadataValidationContext(engineContext, this.cfg, this.metaClient, true);
        HoodieMetadataValidationContext fsBasedContext = new HoodieMetadataValidationContext(engineContext, this.cfg, this.metaClient, false);
        Set finalBaseFilesForCleaning = baseFilesForCleaning;
        List<Boolean> result = engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> {
            try {
                this.validateFilesInPartition(metadataTableBasedContext, fsBasedContext, (String)partitionPath, finalBaseFilesForCleaning);
                LOG.info((Object)String.format("Metadata table validation succeeded for partition %s (partition %s)", partitionPath, this.taskLabels));
                return true;
            }
            catch (HoodieValidationException e) {
                LOG.error((Object)String.format("Metadata table validation failed for partition %s due to HoodieValidationException (partition %s)", partitionPath, this.taskLabels), (Throwable)e);
                if (!this.cfg.ignoreFailed) {
                    throw e;
                }
                return false;
            }
        }).collectAsList();
        for (Boolean res : result) {
            finalResult &= res.booleanValue();
        }
        if (finalResult) {
            LOG.info((Object)String.format("Metadata table validation succeeded (%s).", this.taskLabels));
        } else {
            LOG.warn((Object)String.format("Metadata table validation failed (%s).", this.taskLabels));
        }
    }

    private boolean checkMetadataTableIsAvailable() {
        try {
            HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder().setConf(this.jsc.hadoopConfiguration()).setBasePath(new Path(this.cfg.basePath, ".hoodie/metadata").toString()).setLoadActiveTimelineOnLoad(true).build();
            int finishedInstants = mdtMetaClient.getActiveTimeline().filterCompletedInstants().countInstants();
            if (finishedInstants == 0) {
                if (this.metaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 0) {
                    LOG.info((Object)"There is no completed instant both in metadata table and corresponding data table.");
                    return false;
                }
                throw new HoodieValidationException("There is no completed instant for metadata table.");
            }
            return true;
        }
        catch (TableNotFoundException tbe) {
            LOG.warn((Object)"Metadata table is not found. Skip current validation.");
            return false;
        }
        catch (Exception ex) {
            LOG.warn((Object)"Metadata table is not available to read for now, ", (Throwable)ex);
            return false;
        }
    }

    private List<String> validatePartitions(HoodieSparkEngineContext engineContext, String basePath) {
        List<String> allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(engineContext, basePath, false, this.cfg.assumeDatePartitioning);
        HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants();
        allPartitionPathsFromFS = ((Stream)allPartitionPathsFromFS.stream().parallel()).filter(part -> {
            HoodiePartitionMetadata hoodiePartitionMetadata = new HoodiePartitionMetadata(this.metaClient.getFs(), FSUtils.getPartitionPath(basePath, part));
            Option<String> instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime();
            if (instantOption.isPresent()) {
                String instantTime = instantOption.get();
                return completedTimeline.containsOrBeforeTimelineStarts(instantTime);
            }
            return false;
        }).collect(Collectors.toList());
        List<String> allPartitionPathsMeta = FSUtils.getAllPartitionPaths(engineContext, basePath, true, this.cfg.assumeDatePartitioning);
        Collections.sort(allPartitionPathsFromFS);
        Collections.sort(allPartitionPathsMeta);
        if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size() || !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) {
            String message = "Compare Partitions Failed! AllPartitionPathsFromFS : " + allPartitionPathsFromFS + " and allPartitionPathsMeta : " + allPartitionPathsMeta;
            LOG.error((Object)message);
            throw new HoodieValidationException(message);
        }
        return allPartitionPathsMeta;
    }

    private void validateFilesInPartition(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        if (this.cfg.validateLatestFileSlices) {
            this.validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateLatestBaseFiles) {
            this.validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateAllFileGroups) {
            this.validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateAllColumnStats) {
            this.validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateBloomFilters) {
            this.validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
    }

    private void validateAllFileGroups(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<FileSlice> allFileSlicesFromFS;
        List<FileSlice> allFileSlicesFromMeta;
        if (!baseDataFilesForCleaning.isEmpty()) {
            List<FileSlice> fileSlicesFromMeta = metadataTableBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
            List<FileSlice> fileSlicesFromFS = fsBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
            allFileSlicesFromMeta = this.filterFileSliceBasedOnInflightCleaning(fileSlicesFromMeta, baseDataFilesForCleaning);
            allFileSlicesFromFS = this.filterFileSliceBasedOnInflightCleaning(fileSlicesFromFS, baseDataFilesForCleaning);
        } else {
            allFileSlicesFromMeta = metadataTableBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
            allFileSlicesFromFS = fsBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
        }
        LOG.debug((Object)("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath));
        LOG.debug((Object)("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath));
        this.validateFileSlices(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, fsBasedContext.getMetaClient(), "all file groups");
    }

    private void validateLatestBaseFiles(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<HoodieBaseFile> latestFilesFromFS;
        List<HoodieBaseFile> latestFilesFromMetadata;
        if (!baseDataFilesForCleaning.isEmpty()) {
            latestFilesFromMetadata = this.filterBaseFileBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
            latestFilesFromFS = this.filterBaseFileBasedOnInflightCleaning(fsBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
        } else {
            latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
            latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
        }
        LOG.debug((Object)("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath));
        LOG.debug((Object)("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath));
        this.validate(latestFilesFromMetadata, latestFilesFromFS, partitionPath, "latest base files");
    }

    private void validateLatestFileSlices(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<FileSlice> latestFileSlicesFromFS;
        List<FileSlice> latestFileSlicesFromMetadataTable;
        if (!baseDataFilesForCleaning.isEmpty()) {
            latestFileSlicesFromMetadataTable = this.filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
            latestFileSlicesFromFS = this.filterFileSliceBasedOnInflightCleaning(fsBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
        } else {
            latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
            latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
        }
        LOG.debug((Object)("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath));
        LOG.debug((Object)("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath));
        this.validateFileSlices(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, fsBasedContext.getMetaClient(), "latest file slices");
    }

    private List<FileSlice> filterFileSliceBasedOnInflightCleaning(List<FileSlice> sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
        return sortedLatestFileSliceList.stream().filter(fileSlice -> {
            if (!fileSlice.getBaseFile().isPresent()) {
                return true;
            }
            return !baseDataFilesForCleaning.contains(fileSlice.getBaseFile().get().getFileName());
        }).collect(Collectors.toList());
    }

    private List<HoodieBaseFile> filterBaseFileBasedOnInflightCleaning(List<HoodieBaseFile> sortedBaseFileList, Set<String> baseDataFilesForCleaning) {
        return sortedBaseFileList.stream().filter(baseFile -> !baseDataFilesForCleaning.contains(baseFile.getFileName())).collect(Collectors.toList());
    }

    private void validateAllColumnStats(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<String> latestBaseFilenameList = this.getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
        List<HoodieColumnRangeMetadata<Comparable>> metadataBasedColStats = metadataTableBasedContext.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
        List<HoodieColumnRangeMetadata<Comparable>> fsBasedColStats = fsBasedContext.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
        this.validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats");
    }

    private void validateBloomFilters(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<String> latestBaseFilenameList = this.getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
        List<BloomFilterData> metadataBasedBloomFilters = metadataTableBasedContext.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
        List<BloomFilterData> fsBasedBloomFilters = fsBasedContext.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
        this.validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters");
    }

    private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<String> latestBaseFilenameList;
        if (!baseDataFilesForCleaning.isEmpty()) {
            List<HoodieBaseFile> sortedLatestBaseFileList = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
            latestBaseFilenameList = this.filterBaseFileBasedOnInflightCleaning(sortedLatestBaseFileList, baseDataFilesForCleaning).stream().map(BaseFile::getFileName).collect(Collectors.toList());
        } else {
            latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath).stream().map(BaseFile::getFileName).collect(Collectors.toList());
        }
        return latestBaseFilenameList;
    }

    private <T> void validate(List<T> infoListFromMetadataTable, List<T> infoListFromFS, String partitionPath, String label) {
        if (infoListFromMetadataTable.size() != infoListFromFS.size() || !infoListFromMetadataTable.equals(infoListFromFS)) {
            String message = String.format("Validation of %s for partition %s failed.\n%s from metadata: %s\n%s from file system and base files: %s", label, partitionPath, label, infoListFromMetadataTable, label, infoListFromFS);
            LOG.error((Object)message);
            throw new HoodieValidationException(message);
        }
        LOG.info((Object)String.format("Validation of %s succeeded for partition %s", label, partitionPath));
    }

    private void validateFileSlices(List<FileSlice> fileSliceListFromMetadataTable, List<FileSlice> fileSliceListFromFS, String partitionPath, HoodieTableMetaClient metaClient, String label) {
        boolean mismatch = false;
        if (fileSliceListFromMetadataTable.size() != fileSliceListFromFS.size()) {
            mismatch = true;
        } else if (!fileSliceListFromMetadataTable.equals(fileSliceListFromFS)) {
            for (int i = 0; i < fileSliceListFromMetadataTable.size(); ++i) {
                FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i);
                FileSlice fileSlice2 = fileSliceListFromFS.get(i);
                if (!(Objects.equals(fileSlice1.getFileGroupId(), fileSlice2.getFileGroupId()) && Objects.equals(fileSlice1.getBaseInstantTime(), fileSlice2.getBaseInstantTime()) && Objects.equals(fileSlice1.getBaseFile(), fileSlice2.getBaseFile()))) {
                    mismatch = true;
                    break;
                }
                if (!this.areFileSliceCommittedLogFilesMatching(fileSlice1, fileSlice2, metaClient)) {
                    mismatch = true;
                    break;
                }
                LOG.warn((Object)String.format("There are uncommitted log files in the latest file slices but the committed log files match: %s %s", fileSlice1, fileSlice2));
            }
        }
        if (mismatch) {
            String message = String.format("Validation of %s for partition %s failed.\n%s from metadata: %s\n%s from file system and base files: %s", label, partitionPath, label, fileSliceListFromMetadataTable, label, fileSliceListFromFS);
            LOG.error((Object)message);
            throw new HoodieValidationException(message);
        }
        LOG.info((Object)String.format("Validation of %s succeeded for partition %s", label, partitionPath));
    }

    private boolean areFileSliceCommittedLogFilesMatching(FileSlice fs1, FileSlice fs2, HoodieTableMetaClient metaClient) {
        Set<String> fs1LogPathSet = fs1.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
        Set<String> fs2LogPathSet = fs2.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
        HashSet commonLogPathSet = new HashSet(fs1LogPathSet);
        commonLogPathSet.retainAll(fs2LogPathSet);
        fs1LogPathSet.removeAll(commonLogPathSet);
        fs2LogPathSet.removeAll(commonLogPathSet);
        HoodieWrapperFileSystem fileSystem = metaClient.getFs();
        HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline();
        if (this.hasCommittedLogFiles(fileSystem, fs1LogPathSet, commitsTimeline)) {
            LOG.error((Object)("The first file slice has committed log files that cause mismatching: " + fs1));
            return false;
        }
        if (this.hasCommittedLogFiles(fileSystem, fs2LogPathSet, commitsTimeline)) {
            LOG.error((Object)("The second file slice has committed log files that cause mismatching: " + fs2));
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean hasCommittedLogFiles(FileSystem fs, Set<String> logFilePathSet, HoodieTimeline commitsTimeline) {
        if (logFilePathSet.isEmpty()) {
            return false;
        }
        AvroSchemaConverter converter = new AvroSchemaConverter();
        HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
        HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
        for (String logFilePathStr : logFilePathSet) {
            HoodieLogFormat.Reader reader = null;
            try {
                MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePathStr));
                if (messageType == null) {
                    LOG.warn((Object)String.format("Cannot read schema from log file %s. Skip the check as it's likely being written by an inflight instant.", logFilePathStr));
                    continue;
                }
                Schema readerSchema = converter.convert(messageType);
                reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFilePathStr)), readerSchema);
                if (reader.hasNext()) {
                    HoodieLogBlock block = (HoodieLogBlock)reader.next();
                    String instantTime = block.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME);
                    if (completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) && !inflightInstantsTimeline.containsInstant(instantTime)) {
                        LOG.warn((Object)("Log file is committed: " + logFilePathStr));
                        boolean bl = true;
                        FileIOUtils.closeQuietly(reader);
                        return bl;
                    }
                    LOG.warn((Object)("Log file is uncommitted: " + logFilePathStr));
                } else {
                    LOG.warn((Object)("There is no log block in " + logFilePathStr));
                }
                FileIOUtils.closeQuietly(reader);
            }
            catch (IOException e) {
                LOG.warn((Object)String.format("Cannot read log file %s: %s. Skip the check as it's likely being written by an inflight instant.", logFilePathStr, e.getMessage()), (Throwable)e);
            }
            finally {
                FileIOUtils.closeQuietly(reader);
            }
        }
        return false;
    }

    private static class HoodieMetadataValidationContext
    implements Serializable {
        private HoodieTableMetaClient metaClient;
        private HoodieTableFileSystemView fileSystemView;
        private HoodieTableMetadata tableMetadata;
        private boolean enableMetadataTable;
        private List<String> allColumnNameList;

        public HoodieMetadataValidationContext(HoodieEngineContext engineContext, Config cfg, HoodieTableMetaClient metaClient, boolean enableMetadataTable) {
            this.metaClient = metaClient;
            this.enableMetadataTable = enableMetadataTable;
            HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).withMetadataIndexBloomFilter(enableMetadataTable).withMetadataIndexColumnStats(enableMetadataTable).withAssumeDatePartitioning(cfg.assumeDatePartitioning).build();
            this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, metadataConfig);
            this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
            if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) {
                this.allColumnNameList = this.getAllColumnNames();
            }
        }

        public HoodieTableMetaClient getMetaClient() {
            return this.metaClient;
        }

        public List<HoodieBaseFile> getSortedLatestBaseFileList(String partitionPath) {
            return this.fileSystemView.getLatestBaseFiles(partitionPath).sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
        }

        public List<FileSlice> getSortedLatestFileSliceList(String partitionPath) {
            return this.fileSystemView.getLatestFileSlices(partitionPath).sorted(new FileSliceComparator()).collect(Collectors.toList());
        }

        public List<HoodieFileGroup> getSortedAllFileGroupList(String partitionPath) {
            return this.fileSystemView.getAllFileGroups(partitionPath).sorted(new HoodieFileGroupComparator()).collect(Collectors.toList());
        }

        public List<HoodieColumnRangeMetadata<Comparable>> getSortedColumnStatsList(String partitionPath, List<String> baseFileNameList) {
            LOG.info((Object)("All column names for getting column stats: " + this.allColumnNameList));
            if (this.enableMetadataTable) {
                List partitionFileNameList = baseFileNameList.stream().map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
                return this.allColumnNameList.stream().flatMap(columnName -> this.tableMetadata.getColumnStats(partitionFileNameList, (String)columnName).values().stream().map(HoodieTableMetadataUtil::convertColumnStatsRecordToColumnRangeMetadata).collect(Collectors.toList()).stream()).sorted(new HoodieColumnRangeMetadataComparator()).collect(Collectors.toList());
            }
            return baseFileNameList.stream().flatMap(filename -> new ParquetUtils().readRangeFromParquetMetadata(this.metaClient.getHadoopConf(), new Path(FSUtils.getPartitionPath(this.metaClient.getBasePath(), partitionPath), filename), this.allColumnNameList).stream()).sorted(new HoodieColumnRangeMetadataComparator()).collect(Collectors.toList());
        }

        public List<BloomFilterData> getSortedBloomFilterList(String partitionPath, List<String> baseFileNameList) {
            if (this.enableMetadataTable) {
                List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream().map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
                return this.tableMetadata.getBloomFilters(partitionFileNameList).entrySet().stream().map(entry -> BloomFilterData.builder().setPartitionPath((String)((Pair)entry.getKey()).getKey()).setFilename((String)((Pair)entry.getKey()).getValue()).setBloomFilter(ByteBuffer.wrap(((BloomFilter)entry.getValue()).serializeToString().getBytes())).build()).sorted().collect(Collectors.toList());
            }
            return baseFileNameList.stream().map(filename -> this.readBloomFilterFromFile(partitionPath, (String)filename)).filter(Option::isPresent).map(Option::get).sorted().collect(Collectors.toList());
        }

        private List<String> getAllColumnNames() {
            TableSchemaResolver schemaResolver = new TableSchemaResolver(this.metaClient);
            try {
                return schemaResolver.getTableAvroSchema().getFields().stream().map(entry -> entry.name()).collect(Collectors.toList());
            }
            catch (Exception e) {
                throw new HoodieException("Failed to get all column names for " + this.metaClient.getBasePath());
            }
        }

        private Option<BloomFilterData> readBloomFilterFromFile(String partitionPath, String filename) {
            HoodieFileReader fileReader;
            Path path = new Path(FSUtils.getPartitionPath(this.metaClient.getBasePath(), partitionPath), filename);
            try {
                fileReader = HoodieFileReaderFactory.getFileReader(this.metaClient.getHadoopConf(), path);
            }
            catch (IOException e) {
                Log.error((Object[])new Object[]{"Failed to get file reader for " + path + " " + e.getMessage()});
                return Option.empty();
            }
            BloomFilter fileBloomFilter = fileReader.readBloomFilter();
            if (fileBloomFilter == null) {
                Log.error((Object[])new Object[]{"Failed to read bloom filter for " + path});
                return Option.empty();
            }
            return Option.of(BloomFilterData.builder().setPartitionPath(partitionPath).setFilename(filename).setBloomFilter(ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes())).build());
        }
    }

    public static class HoodieColumnRangeMetadataComparator
    implements Comparator<HoodieColumnRangeMetadata<Comparable>>,
    Serializable {
        @Override
        public int compare(HoodieColumnRangeMetadata<Comparable> o1, HoodieColumnRangeMetadata<Comparable> o2) {
            return o1.toString().compareTo(o2.toString());
        }
    }

    public static class HoodieFileGroupComparator
    implements Comparator<HoodieFileGroup>,
    Serializable {
        @Override
        public int compare(HoodieFileGroup o1, HoodieFileGroup o2) {
            return o1.getFileGroupId().compareTo(o2.getFileGroupId());
        }
    }

    public static class HoodieBaseFileComparator
    implements Comparator<HoodieBaseFile>,
    Serializable {
        @Override
        public int compare(HoodieBaseFile o1, HoodieBaseFile o2) {
            return o1.getPath().compareTo(o2.getPath());
        }
    }

    public static class FileSliceComparator
    implements Comparator<FileSlice>,
    Serializable {
        @Override
        public int compare(FileSlice o1, FileSlice o2) {
            return (o1.getPartitionPath() + o1.getFileId() + o1.getBaseInstantTime()).compareTo(o2.getPartitionPath() + o2.getFileId() + o2.getBaseInstantTime());
        }
    }

    public class AsyncMetadataTableValidateService
    extends HoodieAsyncService {
        private final transient ExecutorService executor = Executors.newSingleThreadExecutor();

        @Override
        protected Pair<CompletableFuture, ExecutorService> startService() {
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                while (true) {
                    try {
                        while (true) {
                            long start2 = System.currentTimeMillis();
                            HoodieMetadataTableValidator.this.doMetadataTableValidation();
                            long toSleepMs = (long)(((HoodieMetadataTableValidator)HoodieMetadataTableValidator.this).cfg.minValidateIntervalSeconds * 1000) - (System.currentTimeMillis() - start2);
                            if (toSleepMs <= 0L) continue;
                            LOG.info((Object)("Last validate ran less than min validate interval: " + ((HoodieMetadataTableValidator)HoodieMetadataTableValidator.this).cfg.minValidateIntervalSeconds + " s, sleep: " + toSleepMs + " ms."));
                            Thread.sleep(toSleepMs);
                        }
                    }
                    catch (HoodieValidationException e) {
                        LOG.error((Object)"Shutting down AsyncMetadataTableValidateService due to HoodieValidationException", (Throwable)e);
                        if (((HoodieMetadataTableValidator)HoodieMetadataTableValidator.this).cfg.ignoreFailed) continue;
                        throw e;
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
            }, this.executor), this.executor);
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--continuous"}, description="Running MetadataTableValidator in continuous. Can use --min-validate-interval-seconds to control validation frequency", required=false)
        public boolean continuous = false;
        @Parameter(names={"--skip-data-files-for-cleaning"}, description="Skip to compare the data files which are under deletion by cleaner", required=false)
        public boolean skipDataFilesForCleaning = false;
        @Parameter(names={"--validate-latest-file-slices"}, description="Validate latest file slices for all partitions.", required=false)
        public boolean validateLatestFileSlices = false;
        @Parameter(names={"--validate-latest-base-files"}, description="Validate latest base files for all partitions.", required=false)
        public boolean validateLatestBaseFiles = false;
        @Parameter(names={"--validate-all-file-groups"}, description="Validate all file groups, and all file slices within file groups.", required=false)
        public boolean validateAllFileGroups = false;
        @Parameter(names={"--validate-all-column-stats"}, description="Validate column stats for all columns in the schema", required=false)
        public boolean validateAllColumnStats = false;
        @Parameter(names={"--validate-bloom-filters"}, description="Validate bloom filters of base files", required=false)
        public boolean validateBloomFilters = false;
        @Parameter(names={"--min-validate-interval-seconds"}, description="the min validate interval of each validate when set --continuous, default is 10 minutes.")
        public Integer minValidateIntervalSeconds = 600;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for valuation", required=false)
        public int parallelism = 200;
        @Parameter(names={"--ignore-failed", "-ig"}, description="Ignore metadata validate failure and continue.", required=false)
        public boolean ignoreFailed = false;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master", required=false)
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=false)
        public String sparkMemory = "1g";
        @Parameter(names={"--assume-date-partitioning"}, description="Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path.This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually", required=false)
        public Boolean assumeDatePartitioning = false;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;

        public String toString() {
            return "MetadataTableValidatorConfig {\n   --base-path " + this.basePath + ", \n   --validate-latest-file-slices " + this.validateLatestFileSlices + ", \n   --validate-latest-base-files " + this.validateLatestBaseFiles + ", \n   --validate-all-file-groups " + this.validateAllFileGroups + ", \n   --validate-all-column-stats " + this.validateAllColumnStats + ", \n   --validate-bloom-filters " + this.validateBloomFilters + ", \n   --continuous " + this.continuous + ", \n   --skip-data-files-for-cleaning " + this.skipDataFilesForCleaning + ", \n   --ignore-failed " + this.ignoreFailed + ", \n   --min-validate-interval-seconds " + this.minValidateIntervalSeconds + ", \n   --parallelism " + this.parallelism + ", \n   --spark-master " + this.sparkMaster + ", \n   --spark-memory " + this.sparkMemory + ", \n   --assumeDatePartitioning-memory " + this.assumeDatePartitioning + ", \n   --props " + this.propsFilePath + ", \n   --hoodie-conf " + this.configs + "\n}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Config config = (Config)o;
            return this.basePath.equals(config.basePath) && Objects.equals(this.continuous, config.continuous) && Objects.equals(this.skipDataFilesForCleaning, config.skipDataFilesForCleaning) && Objects.equals(this.validateLatestFileSlices, config.validateLatestFileSlices) && Objects.equals(this.validateLatestBaseFiles, config.validateLatestBaseFiles) && Objects.equals(this.validateAllFileGroups, config.validateAllFileGroups) && Objects.equals(this.validateAllColumnStats, config.validateAllColumnStats) && Objects.equals(this.validateBloomFilters, config.validateBloomFilters) && Objects.equals(this.minValidateIntervalSeconds, config.minValidateIntervalSeconds) && Objects.equals(this.parallelism, config.parallelism) && Objects.equals(this.ignoreFailed, config.ignoreFailed) && Objects.equals(this.sparkMaster, config.sparkMaster) && Objects.equals(this.sparkMemory, config.sparkMemory) && Objects.equals(this.assumeDatePartitioning, config.assumeDatePartitioning) && Objects.equals(this.propsFilePath, config.propsFilePath) && Objects.equals(this.configs, config.configs);
        }

        public int hashCode() {
            return Objects.hash(this.basePath, this.continuous, this.skipDataFilesForCleaning, this.validateLatestFileSlices, this.validateLatestBaseFiles, this.validateAllFileGroups, this.validateAllColumnStats, this.validateBloomFilters, this.minValidateIntervalSeconds, this.parallelism, this.ignoreFailed, this.sparkMaster, this.sparkMemory, this.assumeDatePartitioning, this.propsFilePath, this.configs, this.help);
        }
    }
}

