/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.LambdaMetafactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.io.storage.HoodieSparkIOFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.util.BloomFilterData;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class HoodieMetadataTableValidator
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HoodieMetadataTableValidator.class);
    private transient JavaSparkContext jsc;
    private Config cfg;
    private TypedProperties props;
    private final HoodieTableMetaClient metaClient;
    protected transient Option<AsyncMetadataTableValidateService> asyncMetadataTableValidateService;
    private final String taskLabels;
    private List<Throwable> throwables = new ArrayList<Throwable>();

    public HoodieMetadataTableValidator(JavaSparkContext jsc, Config cfg) {
        this.jsc = jsc;
        this.cfg = cfg;
        this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
        this.metaClient = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
        this.asyncMetadataTableValidateService = cfg.continuous ? Option.of(new AsyncMetadataTableValidateService()) : Option.empty();
        this.taskLabels = this.generateValidationTaskLabels();
    }

    public List<Throwable> getThrowables() {
        return this.throwables;
    }

    public boolean hasValidationFailure() {
        for (Throwable throwable : this.throwables) {
            if (!(throwable instanceof HoodieValidationException)) continue;
            return true;
        }
        return false;
    }

    private String generateValidationTaskLabels() {
        ArrayList<String> labelList = new ArrayList<String>();
        labelList.add(this.cfg.basePath);
        if (this.cfg.validateLatestBaseFiles) {
            labelList.add("validate-latest-base-files");
        }
        if (this.cfg.validateLatestFileSlices) {
            labelList.add("validate-latest-file-slices");
        }
        if (this.cfg.validateAllFileGroups) {
            labelList.add("validate-all-file-groups");
        }
        if (this.cfg.validateAllColumnStats) {
            labelList.add("validate-all-column-stats");
        }
        if (this.cfg.validateBloomFilters) {
            labelList.add("validate-bloom-filters");
        }
        if (this.cfg.validateRecordIndexCount) {
            labelList.add("validate-record-index-count");
        }
        if (this.cfg.validateRecordIndexContent) {
            labelList.add("validate-record-index-content");
        }
        return String.join((CharSequence)",", labelList);
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args2) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args2);
        if (cfg.help.booleanValue() || args2.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-Metadata-Table-Validator", cfg.sparkMaster);
        sparkConf.set("spark.executor.memory", cfg.sparkMemory);
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        try {
            HoodieMetadataTableValidator validator = new HoodieMetadataTableValidator(jsc, cfg);
            validator.run();
        }
        catch (TableNotFoundException e) {
            LOG.warn(String.format("The Hudi data table is not found: [%s]. Skipping the validation of the metadata table.", cfg.basePath), (Throwable)e);
        }
        catch (Throwable throwable) {
            LOG.error("Fail to do hoodie metadata table validation for " + cfg, throwable);
        }
        finally {
            jsc.stop();
        }
    }

    public boolean run() {
        boolean result = false;
        try {
            LOG.info(this.cfg.toString());
            if (this.cfg.continuous) {
                LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS mode - {} ******", (Object)this.taskLabels);
                this.doHoodieMetadataTableValidationContinuous();
            }
            LOG.info(" ****** do hoodie metadata table validation once - {} ******", (Object)this.taskLabels);
            result = this.doHoodieMetadataTableValidationOnce();
            return result;
        }
        catch (Exception e) {
            throw new HoodieException("Unable to do hoodie metadata table validation in " + this.cfg.basePath, e);
        }
        finally {
            if (this.asyncMetadataTableValidateService.isPresent()) {
                this.asyncMetadataTableValidateService.get().shutdown(true);
            }
            return result;
        }
    }

    private boolean doHoodieMetadataTableValidationOnce() {
        try {
            return this.doMetadataTableValidation();
        }
        catch (Throwable e) {
            LOG.error("Metadata table validation failed to HoodieValidationException {} {}", (Object)this.taskLabels, (Object)e);
            if (!this.cfg.ignoreFailed) {
                throw e;
            }
            this.throwables.add(e);
            return false;
        }
    }

    private void doHoodieMetadataTableValidationContinuous() {
        this.asyncMetadataTableValidateService.ifPresent(service -> {
            service.start(null);
            try {
                service.waitForShutdown();
            }
            catch (Exception e) {
                throw new HoodieException(e.getMessage(), e);
            }
        });
    }

    /*
     * Exception decompiling
     */
    public boolean doMetadataTableValidation() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private boolean checkMetadataTableIsAvailable() {
        try {
            HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(this.jsc.hadoopConfiguration())).setBasePath(new Path(this.cfg.basePath, ".hoodie/metadata").toString()).setLoadActiveTimelineOnLoad(true).build();
            int finishedInstants = mdtMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
            if (finishedInstants == 0) {
                if (this.metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
                    LOG.info("There is no completed commit in both metadata table and corresponding data table: {}", (Object)this.taskLabels);
                    return false;
                }
                throw new HoodieValidationException("There is no completed instant for metadata table: " + this.cfg.basePath);
            }
            return true;
        }
        catch (TableNotFoundException tbe) {
            LOG.warn("Metadata table is not found for table: {}. Skip current validation.", (Object)this.cfg.basePath);
            return false;
        }
        catch (Exception ex) {
            LOG.warn("Metadata table is not available to read for now for table: {}, ", (Object)this.cfg.basePath, (Object)ex);
            return false;
        }
    }

    @VisibleForTesting
    List<String> validatePartitions(HoodieSparkEngineContext engineContext, String basePath, HoodieTableMetaClient metaClient) {
        HoodieTimeline completedTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
        List<String> allPartitionPathsFromFS = this.getPartitionsFromFileSystem(engineContext, basePath, metaClient.getStorage(), completedTimeline);
        List<String> allPartitionPathsMeta = this.getPartitionsFromMDT(engineContext, basePath, metaClient.getStorage());
        Collections.sort(allPartitionPathsFromFS);
        Collections.sort(allPartitionPathsMeta);
        if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size() || !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) {
            ArrayList<String> additionalFromFS = new ArrayList<String>(allPartitionPathsFromFS);
            additionalFromFS.removeAll(allPartitionPathsMeta);
            ArrayList<String> additionalFromMDT = new ArrayList<String>(allPartitionPathsMeta);
            additionalFromMDT.removeAll(allPartitionPathsFromFS);
            boolean misMatch = true;
            ArrayList<String> actualAdditionalPartitionsInMDT = new ArrayList<String>(additionalFromMDT);
            if (additionalFromFS.isEmpty() && !additionalFromMDT.isEmpty()) {
                additionalFromMDT.forEach(partitionFromDMT -> {
                    Option<HoodieInstant> lastInstant;
                    Option<String> partitionCreationTimeOpt = this.getPartitionCreationInstant(metaClient.getStorage(), basePath, (String)partitionFromDMT);
                    if (partitionCreationTimeOpt.isPresent() && !completedTimeline.containsInstant(partitionCreationTimeOpt.get()) && (lastInstant = completedTimeline.lastInstant()).isPresent() && HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), HoodieTimeline.GREATER_THAN, lastInstant.get().getTimestamp())) {
                        LOG.warn("Ignoring additional partition {}, as it was deduced to be part of a latest completed commit which was inflight when FS based listing was polled.", partitionFromDMT);
                        actualAdditionalPartitionsInMDT.remove(partitionFromDMT);
                    }
                });
                if (actualAdditionalPartitionsInMDT.isEmpty()) {
                    misMatch = false;
                }
            }
            if (misMatch) {
                String message = "Compare Partitions Failed!  Additional partitions from FS, but missing from MDT : \"" + additionalFromFS + "\" and additional partitions from MDT, but missing from FS listing : \"" + actualAdditionalPartitionsInMDT + "\".\n All partitions from FS listing " + allPartitionPathsFromFS;
                LOG.error(message);
                throw new HoodieValidationException(message);
            }
        }
        return allPartitionPathsMeta;
    }

    @VisibleForTesting
    Option<String> getPartitionCreationInstant(HoodieStorage storage, String basePath, String partition) {
        HoodiePartitionMetadata hoodiePartitionMetadata = new HoodiePartitionMetadata(storage, FSUtils.constructAbsolutePath(basePath, partition));
        return hoodiePartitionMetadata.readPartitionCreatedCommitTime();
    }

    @VisibleForTesting
    List<String> getPartitionsFromMDT(HoodieEngineContext engineContext, String basePath, HoodieStorage storage) {
        return FSUtils.getAllPartitionPaths(engineContext, storage, basePath, true, false);
    }

    @VisibleForTesting
    List<String> getPartitionsFromFileSystem(HoodieEngineContext engineContext, String basePath, HoodieStorage storage, HoodieTimeline completedTimeline) {
        List<String> allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(engineContext, storage, basePath, false, false);
        return ((Stream)allPartitionPathsFromFS.stream().parallel()).filter(part -> {
            HoodiePartitionMetadata hoodiePartitionMetadata = new HoodiePartitionMetadata(storage, FSUtils.constructAbsolutePath(basePath, part));
            Option<String> instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime();
            if (instantOption.isPresent()) {
                String instantTime = instantOption.get();
                if (!completedTimeline.containsOrBeforeTimelineStarts(instantTime)) {
                    Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
                    return lastInstant.isPresent() && HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp());
                }
                return true;
            }
            return false;
        }).collect(Collectors.toList());
    }

    private void validateFilesInPartition(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        if (this.cfg.validateLatestFileSlices) {
            this.validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateLatestBaseFiles) {
            this.validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateAllFileGroups) {
            this.validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateAllColumnStats) {
            this.validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
        if (this.cfg.validateBloomFilters) {
            this.validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
        }
    }

    private void validateAllFileGroups(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<FileSlice> allFileSlicesFromFS;
        List<FileSlice> allFileSlicesFromMeta;
        if (!baseDataFilesForCleaning.isEmpty()) {
            List<FileSlice> fileSlicesFromMeta = metadataTableBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
            List<FileSlice> fileSlicesFromFS = fsBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
            allFileSlicesFromMeta = this.filterFileSliceBasedOnInflightCleaning(fileSlicesFromMeta, baseDataFilesForCleaning);
            allFileSlicesFromFS = this.filterFileSliceBasedOnInflightCleaning(fileSlicesFromFS, baseDataFilesForCleaning);
        } else {
            allFileSlicesFromMeta = metadataTableBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
            allFileSlicesFromFS = fsBasedContext.getSortedAllFileGroupList(partitionPath).stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()).collect(Collectors.toList());
        }
        LOG.debug("All file slices from metadata: {}. For partitions {}", allFileSlicesFromMeta, (Object)partitionPath);
        LOG.debug("All file slices from direct listing: {}. For partitions {}", allFileSlicesFromFS, (Object)partitionPath);
        this.validateFileSlices(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, fsBasedContext.getMetaClient(), "all file groups");
    }

    private void validateLatestBaseFiles(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<HoodieBaseFile> latestFilesFromFS;
        List<HoodieBaseFile> latestFilesFromMetadata;
        if (!baseDataFilesForCleaning.isEmpty()) {
            latestFilesFromMetadata = this.filterBaseFileBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
            latestFilesFromFS = this.filterBaseFileBasedOnInflightCleaning(fsBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
        } else {
            latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
            latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
        }
        LOG.debug("Latest base file from metadata: {}. For partitions {}", latestFilesFromMetadata, (Object)partitionPath);
        LOG.debug("Latest base file from direct listing: {}. For partitions {}", latestFilesFromFS, (Object)partitionPath);
        this.validate(latestFilesFromMetadata, latestFilesFromFS, partitionPath, "latest base files");
    }

    private void validateLatestFileSlices(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<FileSlice> latestFileSlicesFromFS;
        List<FileSlice> latestFileSlicesFromMetadataTable;
        if (!baseDataFilesForCleaning.isEmpty()) {
            latestFileSlicesFromMetadataTable = this.filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
            latestFileSlicesFromFS = this.filterFileSliceBasedOnInflightCleaning(fsBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
        } else {
            latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
            latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
        }
        LOG.debug("Latest file list from metadata: {}. For partition {}", latestFileSlicesFromMetadataTable, (Object)partitionPath);
        LOG.debug("Latest file list from direct listing: {}. For partition {}", latestFileSlicesFromFS, (Object)partitionPath);
        this.validateFileSlices(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, fsBasedContext.getMetaClient(), "latest file slices");
    }

    private List<FileSlice> filterFileSliceBasedOnInflightCleaning(List<FileSlice> sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
        return sortedLatestFileSliceList.stream().filter(fileSlice -> {
            if (!fileSlice.getBaseFile().isPresent()) {
                return true;
            }
            return !baseDataFilesForCleaning.contains(fileSlice.getBaseFile().get().getFileName());
        }).collect(Collectors.toList());
    }

    private List<HoodieBaseFile> filterBaseFileBasedOnInflightCleaning(List<HoodieBaseFile> sortedBaseFileList, Set<String> baseDataFilesForCleaning) {
        return sortedBaseFileList.stream().filter(baseFile -> !baseDataFilesForCleaning.contains(baseFile.getFileName())).collect(Collectors.toList());
    }

    private void validateAllColumnStats(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<String> latestBaseFilenameList = this.getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
        List<HoodieColumnRangeMetadata<Comparable>> metadataBasedColStats = metadataTableBasedContext.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
        List<HoodieColumnRangeMetadata<Comparable>> fsBasedColStats = fsBasedContext.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
        this.validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats");
    }

    private void validateBloomFilters(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<String> latestBaseFilenameList = this.getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
        List<BloomFilterData> metadataBasedBloomFilters = metadataTableBasedContext.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
        List<BloomFilterData> fsBasedBloomFilters = fsBasedContext.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
        this.validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters");
    }

    private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext, HoodieTableMetaClient metaClient) {
        if (!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX)) {
            return;
        }
        if (this.cfg.validateRecordIndexContent) {
            this.validateRecordIndexContent(sparkEngineContext, metaClient);
        } else if (this.cfg.validateRecordIndexCount) {
            this.validateRecordIndexCount(sparkEngineContext, metaClient);
        }
    }

    private void validateRecordIndexCount(HoodieSparkEngineContext sparkEngineContext, HoodieTableMetaClient metaClient) {
        long countKeyFromRecordIndex;
        String basePath = metaClient.getBasePathV2().toString();
        String latestCompletedCommit = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
        long countKeyFromTable = sparkEngineContext.getSqlContext().read().format("hudi").option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(), latestCompletedCommit).load(basePath).select(HoodieRecord.RECORD_KEY_METADATA_FIELD, new String[0]).count();
        if (countKeyFromTable != (countKeyFromRecordIndex = sparkEngineContext.getSqlContext().read().format("hudi").option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(), latestCompletedCommit).load(HoodieTableMetadata.getMetadataTableBasePath(basePath)).select("key", new String[0]).filter("type = 5").count())) {
            String message = String.format("Validation of record index count failed: %s entries from record index metadata, %s keys from the data table: %s", countKeyFromRecordIndex, countKeyFromTable, this.cfg.basePath);
            LOG.error(message);
            throw new HoodieValidationException(message);
        }
        LOG.info("Validation of record index count succeeded: {} entries. Table: {}", (Object)countKeyFromRecordIndex, (Object)this.cfg.basePath);
    }

    private void validateRecordIndexContent(HoodieSparkEngineContext sparkEngineContext, HoodieTableMetaClient metaClient) {
        String basePath = metaClient.getBasePathV2().toString();
        String latestCompletedCommit = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
        JavaPairRDD<String, Pair<String, String>> keyToLocationOnFsRdd = this.getRecordLocationsFromFSBasedListing(sparkEngineContext, basePath, latestCompletedCommit);
        JavaPairRDD<String, Pair<String, String>> keyToLocationFromRecordIndexRdd = this.getRecordLocationsFromRLI(sparkEngineContext, basePath, latestCompletedCommit);
        int numErrorSamples = this.cfg.numRecordIndexErrorSamples;
        Pair result = (Pair)keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, this.cfg.recordIndexParallelism).map((Function & Serializable)e -> {
            String recordKey = (String)e._1;
            Optional locationOnFs = (Optional)((Tuple2)e._2)._1;
            Optional locationFromRecordIndex = (Optional)((Tuple2)e._2)._2;
            ArrayList<String> errorSampleList = new ArrayList<String>();
            if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) {
                if (((String)((Pair)locationOnFs.get()).getLeft()).equals(((Pair)locationFromRecordIndex.get()).getLeft()) && ((String)((Pair)locationOnFs.get()).getRight()).equals(((Pair)locationFromRecordIndex.get()).getRight())) {
                    return Pair.of(0L, errorSampleList);
                }
                errorSampleList.add(this.constructLocationInfoString(recordKey, (Optional<Pair<String, String>>)locationOnFs, (Optional<Pair<String, String>>)locationFromRecordIndex));
                return Pair.of(1L, errorSampleList);
            }
            if (!locationOnFs.isPresent() && !locationFromRecordIndex.isPresent()) {
                return Pair.of(0L, errorSampleList);
            }
            errorSampleList.add(this.constructLocationInfoString(recordKey, (Optional<Pair<String, String>>)locationOnFs, (Optional<Pair<String, String>>)locationFromRecordIndex));
            return Pair.of(1L, errorSampleList);
        }).reduce((Function2 & Serializable)(pair1, pair2) -> {
            long errorCount = (Long)pair1.getLeft() + (Long)pair2.getLeft();
            List list1 = (List)pair1.getRight();
            List list2 = (List)pair2.getRight();
            if (!list1.isEmpty() && !list2.isEmpty()) {
                if (list1.size() >= numErrorSamples) {
                    return Pair.of(errorCount, list1);
                }
                if (list2.size() >= numErrorSamples) {
                    return Pair.of(errorCount, list2);
                }
                ArrayList<String> resultList = new ArrayList<String>();
                if (list1.size() > list2.size()) {
                    resultList.addAll(list1);
                    for (String item : list2) {
                        resultList.add(item);
                        if (resultList.size() < numErrorSamples) continue;
                        break;
                    }
                } else {
                    resultList.addAll(list2);
                    for (String item : list1) {
                        resultList.add(item);
                        if (resultList.size() < numErrorSamples) continue;
                        break;
                    }
                }
                return Pair.of(errorCount, resultList);
            }
            if (!list1.isEmpty()) {
                return Pair.of(errorCount, list1);
            }
            return Pair.of(errorCount, list2);
        });
        long countKey = keyToLocationOnFsRdd.count();
        keyToLocationOnFsRdd.unpersist();
        long diffCount = (Long)result.getLeft();
        if (diffCount > 0L) {
            String message = String.format("Validation of record index content failed: %s keys (total %s) from the data table have wrong location in record index metadata. Table: %s   Sample mismatches: %s", diffCount, countKey, this.cfg.basePath, String.join((CharSequence)";", (Iterable)result.getRight()));
            LOG.error(message);
            throw new HoodieValidationException(message);
        }
        LOG.info("Validation of record index content succeeded: {} entries. Table: {}", (Object)countKey, (Object)this.cfg.basePath);
    }

    @VisibleForTesting
    JavaPairRDD<String, Pair<String, String>> getRecordLocationsFromFSBasedListing(HoodieSparkEngineContext sparkEngineContext, String basePath, String latestCompletedCommit) {
        return sparkEngineContext.getSqlContext().read().format("hudi").option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(), latestCompletedCommit).load(basePath).select(HoodieRecord.RECORD_KEY_METADATA_FIELD, new String[]{HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD}).toJavaRDD().mapToPair((PairFunction & Serializable)row -> new Tuple2((Object)row.getString(row.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)), Pair.of(row.getString(row.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)), FSUtils.getFileId(row.getString(row.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)))))).cache();
    }

    @VisibleForTesting
    JavaPairRDD<String, Pair<String, String>> getRecordLocationsFromRLI(HoodieSparkEngineContext sparkEngineContext, String basePath, String latestCompletedCommit) {
        return sparkEngineContext.getSqlContext().read().format("hudi").load(HoodieTableMetadata.getMetadataTableBasePath(basePath)).filter("type = 5").select(new Column[]{functions.col((String)"key"), functions.col((String)"recordIndexMetadata.partitionName").as("partitionName"), functions.col((String)"recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"), functions.col((String)"recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"), functions.col((String)"recordIndexMetadata.fileIndex").as("fileIndex"), functions.col((String)"recordIndexMetadata.fileId").as("fileId"), functions.col((String)"recordIndexMetadata.instantTime").as("instantTime"), functions.col((String)"recordIndexMetadata.fileIdEncoding").as("fileIdEncoding")}).toJavaRDD().map((Function & Serializable)row -> {
            HoodieRecordGlobalLocation location = HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(row.getString(row.fieldIndex("partitionName")), row.getInt(row.fieldIndex("fileIdEncoding")), row.getLong(row.fieldIndex("fileIdHighBits")), row.getLong(row.fieldIndex("fileIdLowBits")), row.getInt(row.fieldIndex("fileIndex")), row.getString(row.fieldIndex("fileId")), row.getLong(row.fieldIndex("instantTime")));
            if (HoodieTimeline.compareTimestamps(location.getInstantTime(), HoodieTimeline.GREATER_THAN, latestCompletedCommit)) {
                return new Tuple2(row, Option.empty());
            }
            return new Tuple2(row, Option.of(location));
        }).filter((Function & Serializable)tuple2 -> ((Option)tuple2._2).isPresent()).mapToPair((PairFunction & Serializable)tuple2 -> {
            Tuple2 rowAndLocation = tuple2;
            return new Tuple2((Object)((Row)rowAndLocation._1).getString(((Row)rowAndLocation._1).fieldIndex("key")), Pair.of(((HoodieRecordGlobalLocation)((Option)rowAndLocation._2).get()).getPartitionPath(), ((HoodieRecordGlobalLocation)((Option)rowAndLocation._2).get()).getFileId()));
        }).cache();
    }

    private String constructLocationInfoString(String recordKey, Optional<Pair<String, String>> locationOnFs, Optional<Pair<String, String>> locationFromRecordIndex) {
        StringBuilder sb = new StringBuilder();
        sb.append("Record key " + recordKey + " -> ");
        sb.append("FS: ");
        if (locationOnFs.isPresent()) {
            sb.append(locationOnFs.get());
        } else {
            sb.append("<empty>");
        }
        sb.append(", Record Index: ");
        if (locationFromRecordIndex.isPresent()) {
            sb.append(locationFromRecordIndex.get());
        } else {
            sb.append("<empty>");
        }
        return sb.toString();
    }

    private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
        List<String> latestBaseFilenameList;
        if (!baseDataFilesForCleaning.isEmpty()) {
            List<HoodieBaseFile> sortedLatestBaseFileList = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
            latestBaseFilenameList = this.filterBaseFileBasedOnInflightCleaning(sortedLatestBaseFileList, baseDataFilesForCleaning).stream().map(BaseFile::getFileName).collect(Collectors.toList());
        } else {
            latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath).stream().map(BaseFile::getFileName).collect(Collectors.toList());
        }
        return latestBaseFilenameList;
    }

    private <T> void validate(List<T> infoListFromMetadataTable, List<T> infoListFromFS, String partitionPath, String label) {
        if (infoListFromMetadataTable.size() != infoListFromFS.size() || !infoListFromMetadataTable.equals(infoListFromFS)) {
            String message = String.format("Validation of %s for partition %s failed for table: %s \n%s from metadata: %s\n%s from file system and base files: %s", label, partitionPath, this.cfg.basePath, label, infoListFromMetadataTable, label, infoListFromFS);
            LOG.error(message);
            throw new HoodieValidationException(message);
        }
        LOG.info("Validation of {} succeeded for partition {} for table: {}", new Object[]{label, partitionPath, this.cfg.basePath});
    }

    private void validateFileSlices(List<FileSlice> fileSliceListFromMetadataTable, List<FileSlice> fileSliceListFromFS, String partitionPath, HoodieTableMetaClient metaClient, String label) {
        boolean mismatch = false;
        if (fileSliceListFromMetadataTable.size() != fileSliceListFromFS.size()) {
            mismatch = true;
        } else if (!fileSliceListFromMetadataTable.equals(fileSliceListFromFS)) {
            HashMap<String, Set<String>> committedFilesMap = new HashMap<String, Set<String>>();
            for (int i = 0; i < fileSliceListFromMetadataTable.size(); ++i) {
                FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i);
                FileSlice fileSlice2 = fileSliceListFromFS.get(i);
                if (!(Objects.equals(fileSlice1.getFileGroupId(), fileSlice2.getFileGroupId()) && Objects.equals(fileSlice1.getBaseInstantTime(), fileSlice2.getBaseInstantTime()) && Objects.equals(fileSlice1.getBaseFile(), fileSlice2.getBaseFile()))) {
                    mismatch = true;
                    break;
                }
                if (!this.areFileSliceCommittedLogFilesMatching(fileSlice1, fileSlice2, metaClient, committedFilesMap)) {
                    mismatch = true;
                    break;
                }
                LOG.warn("There are uncommitted log files in the latest file slices but the committed log files match: {} {}", (Object)fileSlice1, (Object)fileSlice2);
            }
        }
        if (mismatch) {
            String message = String.format("Validation of %s for partition %s failed for table: %s \n%s from metadata: %s\n%s from file system and base files: %s", label, partitionPath, this.cfg.basePath, label, fileSliceListFromMetadataTable, label, fileSliceListFromFS);
            LOG.error(message);
            throw new HoodieValidationException(message);
        }
        LOG.info("Validation of {} succeeded for partition {} for table: {}", new Object[]{label, partitionPath, this.cfg.basePath});
    }

    private boolean areFileSliceCommittedLogFilesMatching(FileSlice fs1, FileSlice fs2, HoodieTableMetaClient metaClient, Map<String, Set<String>> committedFilesMap) {
        Set<String> fs1LogPathSet = fs1.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
        Set<String> fs2LogPathSet = fs2.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
        HashSet commonLogPathSet = new HashSet(fs1LogPathSet);
        commonLogPathSet.retainAll(fs2LogPathSet);
        fs1LogPathSet.removeAll(commonLogPathSet);
        fs2LogPathSet.removeAll(commonLogPathSet);
        HoodieStorage storage = metaClient.getStorage();
        if (this.hasCommittedLogFiles(storage, fs1LogPathSet, metaClient, committedFilesMap)) {
            LOG.error("The first file slice has committed log files that cause mismatching: {}; Different log files are: {}", (Object)fs1, fs1LogPathSet);
            return false;
        }
        if (this.hasCommittedLogFiles(storage, fs2LogPathSet, metaClient, committedFilesMap)) {
            LOG.error("The second file slice has committed log files that cause mismatching: {}; Different log files are: {}", (Object)fs2, fs2LogPathSet);
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private boolean hasCommittedLogFiles(HoodieStorage storage, Set<String> logFilePathSet, HoodieTableMetaClient metaClient, Map<String, Set<String>> committedFilesMap) {
        if (logFilePathSet.isEmpty()) {
            return false;
        }
        basePath = metaClient.getBasePathV2().toString();
        commitsTimeline = metaClient.getCommitsTimeline();
        completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
        inflightInstantsTimeline = commitsTimeline.filterInflights();
        for (String logFilePathStr : logFilePathSet) {
            block14: {
                block15: {
                    block13: {
                        block12: {
                            reader = null;
                            readerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePathStr));
                            if (readerSchema != null) break block12;
                            HoodieMetadataTableValidator.LOG.warn("Cannot read schema from log file {}. Skip the check as it's likely being written by an inflight instant.", (Object)logFilePathStr);
                            FileIOUtils.closeQuietly(reader);
                            continue;
                        }
                        reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(logFilePathStr), readerSchema, false);
                        if (!reader.hasNext()) ** GOTO lbl48
                        block = (HoodieLogBlock)reader.next();
                        instantTime = block.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME);
                        if (!completedInstantsTimeline.containsInstant(instantTime)) ** GOTO lbl36
                        if (!committedFilesMap.containsKey(instantTime)) {
                            commitMetadata = HoodieCommitMetadata.fromBytes(completedInstantsTimeline.getInstantDetails(completedInstantsTimeline.filter((Predicate<HoodieInstant>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$hasCommittedLogFiles$11(java.lang.String org.apache.hudi.common.table.timeline.HoodieInstant ), (Lorg/apache/hudi/common/table/timeline/HoodieInstant;)Z)((String)instantTime)).firstInstant().get()).get(), HoodieCommitMetadata.class);
                            committedFilesMap.put(instantTime, commitMetadata.getWriteStats().stream().map((java.util.function.Function<HoodieWriteStat, String>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, getPath(), (Lorg/apache/hudi/common/model/HoodieWriteStat;)Ljava/lang/String;)()).collect(Collectors.toSet()));
                        }
                        relativeLogFilePathStr = this.getRelativePath(basePath, logFilePathStr);
                        if (!committedFilesMap.get(instantTime).contains(relativeLogFilePathStr)) break block13;
                        HoodieMetadataTableValidator.LOG.warn("Log file is committed in an instant in active timeline: instantTime={} {}", (Object)instantTime, (Object)logFilePathStr);
                        var16_18 = true;
                        FileIOUtils.closeQuietly(reader);
                        return var16_18;
                    }
                    HoodieMetadataTableValidator.LOG.warn("Log file is uncommitted in a completed instant, likely due to retry: instantTime={} {}", (Object)instantTime, (Object)logFilePathStr);
                    break block14;
lbl36:
                    // 1 sources

                    if (!completedInstantsTimeline.isBeforeTimelineStarts(instantTime)) break block15;
                    HoodieMetadataTableValidator.LOG.warn("Log file is committed in an instant in archived timeline: instantTime={} {}", (Object)instantTime, (Object)logFilePathStr);
                    var15_17 = true;
                    FileIOUtils.closeQuietly(reader);
                    return var15_17;
                }
                try {
                    if (inflightInstantsTimeline.containsInstant(instantTime)) {
                        HoodieMetadataTableValidator.LOG.warn("Log file is uncommitted because of an inflight instant: instantTime={} {}", (Object)instantTime, (Object)logFilePathStr);
                    } else {
                        HoodieMetadataTableValidator.LOG.warn("Log file is uncommitted because the instant is after the start of the active timeline but absent or in requested in the active timeline: instantTime={} {}", (Object)instantTime, (Object)logFilePathStr);
                    }
                    break block14;
lbl48:
                    // 1 sources

                    HoodieMetadataTableValidator.LOG.warn("There is no log block in {}", (Object)logFilePathStr);
                }
                catch (IOException e) {
                    try {
                        HoodieMetadataTableValidator.LOG.warn(String.format("Cannot read log file %s: %s. Skip the check as it's likely being written by an inflight instant.", new Object[]{logFilePathStr, e.getMessage()}), (Throwable)e);
                    }
                    catch (Throwable var17_19) {
                        FileIOUtils.closeQuietly(reader);
                        throw var17_19;
                    }
                    FileIOUtils.closeQuietly(reader);
                    continue;
                }
            }
            FileIOUtils.closeQuietly(reader);
        }
        return false;
    }

    private String getRelativePath(String basePath, String absoluteFilePath) {
        String basePathStr = new StoragePath(basePath).getPathWithoutSchemeAndAuthority().toString();
        String absoluteFilePathStr = new StoragePath(absoluteFilePath).getPathWithoutSchemeAndAuthority().toString();
        if (!absoluteFilePathStr.startsWith(basePathStr)) {
            throw new IllegalArgumentException("File path does not belong to the base path! basePath=" + basePathStr + " absoluteFilePathStr=" + absoluteFilePathStr);
        }
        String relativePathStr = absoluteFilePathStr.substring(basePathStr.length());
        return relativePathStr.startsWith("/") ? relativePathStr.substring(1) : relativePathStr;
    }

    private static /* synthetic */ boolean lambda$hasCommittedLogFiles$11(String instantTime, HoodieInstant i) {
        return i.getTimestamp().equals(instantTime);
    }

    private /* synthetic */ Pair lambda$doMetadataTableValidation$49b7b3f0$1(HoodieMetadataValidationContext metadataTableBasedContext, HoodieMetadataValidationContext fsBasedContext, Set finalBaseFilesForCleaning, String partitionPath) throws Exception {
        try {
            this.validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning);
            LOG.info("Metadata table validation succeeded for partition {} (partition {})", (Object)partitionPath, (Object)this.taskLabels);
            return Pair.of(true, null);
        }
        catch (HoodieValidationException e) {
            LOG.error(String.format("Metadata table validation failed for partition %s due to HoodieValidationException (partition %s)", partitionPath, this.taskLabels), (Throwable)e);
            if (!this.cfg.ignoreFailed) {
                throw e;
            }
            return Pair.of(false, new HoodieValidationException(e.getMessage() + " for partition: " + partitionPath, e));
        }
    }

    private static /* synthetic */ boolean lambda$doMetadataTableValidation$4(String path) {
        String fileExtension = FSUtils.getFileExtension(path);
        return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(fileExtension);
    }

    private /* synthetic */ Stream lambda$doMetadataTableValidation$3(HoodieInstant instant) {
        try {
            instant = new HoodieInstant(HoodieInstant.State.REQUESTED, instant.getAction(), instant.getTimestamp());
            HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(this.metaClient, instant);
            return cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList -> cleanerFileInfoList.stream().map(fileInfo -> new Path(fileInfo.getFilePath()).getName()));
        }
        catch (IOException e) {
            throw new HoodieIOException("Error reading cleaner metadata for " + instant);
        }
    }

    private static class HoodieMetadataValidationContext
    implements AutoCloseable,
    Serializable {
        private static final Logger LOG = LoggerFactory.getLogger(HoodieMetadataValidationContext.class);
        private final Properties props;
        private final HoodieTableMetaClient metaClient;
        private final HoodieTableFileSystemView fileSystemView;
        private final HoodieTableMetadata tableMetadata;
        private final boolean enableMetadataTable;
        private List<String> allColumnNameList;

        public HoodieMetadataValidationContext(HoodieEngineContext engineContext, Properties props, HoodieTableMetaClient metaClient, boolean enableMetadataTable, boolean assumeDatePartitioning) {
            this.props = props;
            this.metaClient = metaClient;
            this.enableMetadataTable = enableMetadataTable;
            HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).withMetadataIndexBloomFilter(enableMetadataTable).withMetadataIndexColumnStats(enableMetadataTable).withEnableRecordIndex(enableMetadataTable).withAssumeDatePartitioning(assumeDatePartitioning).build();
            this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, metadataConfig);
            this.tableMetadata = HoodieTableMetadata.create(engineContext, metaClient.getStorage(), metadataConfig, metaClient.getBasePathV2().toString());
            if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) {
                this.allColumnNameList = this.getAllColumnNames();
            }
        }

        public HoodieTableMetaClient getMetaClient() {
            return this.metaClient;
        }

        public HoodieTableMetadata getTableMetadata() {
            return this.tableMetadata;
        }

        public List<HoodieBaseFile> getSortedLatestBaseFileList(String partitionPath) {
            return this.fileSystemView.getLatestBaseFiles(partitionPath).sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
        }

        public List<FileSlice> getSortedLatestFileSliceList(String partitionPath) {
            return this.fileSystemView.getLatestFileSlices(partitionPath).sorted(new FileSliceComparator()).collect(Collectors.toList());
        }

        public List<HoodieFileGroup> getSortedAllFileGroupList(String partitionPath) {
            return this.fileSystemView.getAllFileGroups(partitionPath).sorted(new HoodieFileGroupComparator()).collect(Collectors.toList());
        }

        public List<HoodieColumnRangeMetadata<Comparable>> getSortedColumnStatsList(String partitionPath, List<String> baseFileNameList) {
            LOG.info("All column names for getting column stats: {}", this.allColumnNameList);
            if (this.enableMetadataTable) {
                List partitionFileNameList = baseFileNameList.stream().map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
                return this.allColumnNameList.stream().flatMap(columnName -> this.tableMetadata.getColumnStats(partitionFileNameList, (String)columnName).values().stream().map(HoodieTableMetadataUtil::convertColumnStatsRecordToColumnRangeMetadata).collect(Collectors.toList()).stream()).sorted(new HoodieColumnRangeMetadataComparator()).collect(Collectors.toList());
            }
            FileFormatUtils formatUtils = HoodieIOFactory.getIOFactory(this.metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET);
            return baseFileNameList.stream().flatMap(filename -> formatUtils.readColumnStatsFromMetadata(this.metaClient.getStorage(), new StoragePath(FSUtils.constructAbsolutePath(this.metaClient.getBasePathV2(), partitionPath), (String)filename), this.allColumnNameList).stream()).sorted(new HoodieColumnRangeMetadataComparator()).collect(Collectors.toList());
        }

        public List<BloomFilterData> getSortedBloomFilterList(String partitionPath, List<String> baseFileNameList) {
            if (this.enableMetadataTable) {
                List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream().map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
                return this.tableMetadata.getBloomFilters(partitionFileNameList).entrySet().stream().map(entry -> BloomFilterData.builder().setPartitionPath((String)((Pair)entry.getKey()).getKey()).setFilename((String)((Pair)entry.getKey()).getValue()).setBloomFilter(ByteBuffer.wrap(StringUtils.getUTF8Bytes(((BloomFilter)entry.getValue()).serializeToString()))).build()).sorted().collect(Collectors.toList());
            }
            return baseFileNameList.stream().map(filename -> this.readBloomFilterFromFile(partitionPath, (String)filename)).filter(Option::isPresent).map(Option::get).sorted().collect(Collectors.toList());
        }

        private List<String> getAllColumnNames() {
            TableSchemaResolver schemaResolver = new TableSchemaResolver(this.metaClient);
            try {
                return schemaResolver.getTableAvroSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
            }
            catch (Exception e) {
                throw new HoodieException("Failed to get all column names for " + this.metaClient.getBasePathV2());
            }
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private Option<BloomFilterData> readBloomFilterFromFile(String partitionPath, String filename) {
            StoragePath path = new StoragePath(FSUtils.constructAbsolutePath(this.metaClient.getBasePathV2(), partitionPath).toString(), filename);
            HoodieConfig hoodieConfig = new HoodieConfig();
            hoodieConfig.setValue(HoodieReaderConfig.USE_NATIVE_HFILE_READER, Boolean.toString(ConfigUtils.getBooleanWithAltKeys(this.props, HoodieReaderConfig.USE_NATIVE_HFILE_READER)));
            try (HoodieFileReader fileReader = HoodieSparkIOFactory.getHoodieSparkIOFactory(this.metaClient.getStorage()).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hoodieConfig, path);){
                BloomFilter bloomFilter = fileReader.readBloomFilter();
                if (bloomFilter != null) return Option.of(BloomFilterData.builder().setPartitionPath(partitionPath).setFilename(filename).setBloomFilter(ByteBuffer.wrap(StringUtils.getUTF8Bytes(bloomFilter.serializeToString()))).build());
                LOG.error("Failed to read bloom filter for {}", (Object)path);
                Option<BloomFilterData> option = Option.empty();
                return option;
            }
            catch (IOException e) {
                LOG.error("Failed to get file reader for {} {}", (Object)path, (Object)e.getMessage());
                return Option.empty();
            }
        }

        @Override
        public void close() throws Exception {
            this.tableMetadata.close();
            this.fileSystemView.close();
        }
    }

    public static class HoodieColumnRangeMetadataComparator
    implements Comparator<HoodieColumnRangeMetadata<Comparable>>,
    Serializable {
        @Override
        public int compare(HoodieColumnRangeMetadata<Comparable> o1, HoodieColumnRangeMetadata<Comparable> o2) {
            return o1.toString().compareTo(o2.toString());
        }
    }

    public static class HoodieFileGroupComparator
    implements Comparator<HoodieFileGroup>,
    Serializable {
        @Override
        public int compare(HoodieFileGroup o1, HoodieFileGroup o2) {
            return o1.getFileGroupId().compareTo(o2.getFileGroupId());
        }
    }

    public static class HoodieBaseFileComparator
    implements Comparator<HoodieBaseFile>,
    Serializable {
        @Override
        public int compare(HoodieBaseFile o1, HoodieBaseFile o2) {
            return o1.getPath().compareTo(o2.getPath());
        }
    }

    public static class FileSliceComparator
    implements Comparator<FileSlice>,
    Serializable {
        @Override
        public int compare(FileSlice o1, FileSlice o2) {
            return (o1.getPartitionPath() + o1.getFileId() + o1.getBaseInstantTime()).compareTo(o2.getPartitionPath() + o2.getFileId() + o2.getBaseInstantTime());
        }
    }

    public class AsyncMetadataTableValidateService
    extends HoodieAsyncService {
        private final transient ExecutorService executor = Executors.newSingleThreadExecutor();

        @Override
        protected Pair<CompletableFuture, ExecutorService> startService() {
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                while (true) {
                    try {
                        while (true) {
                            long start2 = System.currentTimeMillis();
                            HoodieMetadataTableValidator.this.doMetadataTableValidation();
                            long toSleepMs = (long)(((HoodieMetadataTableValidator)HoodieMetadataTableValidator.this).cfg.minValidateIntervalSeconds * 1000) - (System.currentTimeMillis() - start2);
                            if (toSleepMs <= 0L) continue;
                            LOG.info("Last validate ran less than min validate interval: {} s, sleep: {} ms.", (Object)((HoodieMetadataTableValidator)HoodieMetadataTableValidator.this).cfg.minValidateIntervalSeconds, (Object)toSleepMs);
                            Thread.sleep(toSleepMs);
                        }
                    }
                    catch (HoodieValidationException e) {
                        LOG.error("Shutting down AsyncMetadataTableValidateService due to HoodieValidationException", (Throwable)e);
                        if (!((HoodieMetadataTableValidator)HoodieMetadataTableValidator.this).cfg.ignoreFailed) {
                            throw e;
                        }
                        HoodieMetadataTableValidator.this.throwables.add(e);
                        continue;
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
            }, this.executor), this.executor);
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--continuous"}, description="Running MetadataTableValidator in continuous. Can use --min-validate-interval-seconds to control validation frequency", required=false)
        public boolean continuous = false;
        @Parameter(names={"--skip-data-files-for-cleaning"}, description="Skip to compare the data files which are under deletion by cleaner", required=false)
        public boolean skipDataFilesForCleaning = false;
        @Parameter(names={"--validate-latest-file-slices"}, description="Validate latest file slices for all partitions.", required=false)
        public boolean validateLatestFileSlices = false;
        @Parameter(names={"--validate-latest-base-files"}, description="Validate latest base files for all partitions.", required=false)
        public boolean validateLatestBaseFiles = false;
        @Parameter(names={"--validate-all-file-groups"}, description="Validate all file groups, and all file slices within file groups.", required=false)
        public boolean validateAllFileGroups = false;
        @Parameter(names={"--validate-all-column-stats"}, description="Validate column stats for all columns in the schema", required=false)
        public boolean validateAllColumnStats = false;
        @Parameter(names={"--validate-bloom-filters"}, description="Validate bloom filters of base files", required=false)
        public boolean validateBloomFilters = false;
        @Parameter(names={"--validate-record-index-count"}, description="Validate the number of entries in the record index, which should be equal to the number of record keys in the latest snapshot of the table", required=false)
        public boolean validateRecordIndexCount = false;
        @Parameter(names={"--validate-record-index-content"}, description="Validate the content of the record index so that each record key should have the correct location, and there is no additional or missing entry", required=false)
        public boolean validateRecordIndexContent = false;
        @Parameter(names={"--num-record-index-error-samples"}, description="Number of error samples to show for record index validation", required=false)
        public int numRecordIndexErrorSamples = 100;
        @Parameter(names={"--min-validate-interval-seconds"}, description="the min validate interval of each validate when set --continuous, default is 10 minutes.")
        public Integer minValidateIntervalSeconds = 600;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for valuation", required=false)
        public int parallelism = 200;
        @Parameter(names={"--record-index-parallelism", "-rpl"}, description="Parallelism for validating record index", required=false)
        public int recordIndexParallelism = 100;
        @Parameter(names={"--ignore-failed", "-ig"}, description="Ignore metadata validate failure and continue.", required=false)
        public boolean ignoreFailed = false;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master", required=false)
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=false)
        public String sparkMemory = "1g";
        @Parameter(names={"--assume-date-partitioning"}, description="Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path.This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually", required=false)
        public Boolean assumeDatePartitioning = false;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;

        public String toString() {
            return "MetadataTableValidatorConfig {\n   --base-path " + this.basePath + ", \n   --validate-latest-file-slices " + this.validateLatestFileSlices + ", \n   --validate-latest-base-files " + this.validateLatestBaseFiles + ", \n   --validate-all-file-groups " + this.validateAllFileGroups + ", \n   --validate-all-column-stats " + this.validateAllColumnStats + ", \n   --validate-bloom-filters " + this.validateBloomFilters + ", \n   --validate-record-index-count " + this.validateRecordIndexCount + ", \n   --validate-record-index-content " + this.validateRecordIndexContent + ", \n   --num-record-index-error-samples " + this.numRecordIndexErrorSamples + ", \n   --continuous " + this.continuous + ", \n   --skip-data-files-for-cleaning " + this.skipDataFilesForCleaning + ", \n   --ignore-failed " + this.ignoreFailed + ", \n   --min-validate-interval-seconds " + this.minValidateIntervalSeconds + ", \n   --parallelism " + this.parallelism + ", \n   --record-index-parallelism " + this.recordIndexParallelism + ", \n   --spark-master " + this.sparkMaster + ", \n   --spark-memory " + this.sparkMemory + ", \n   --assumeDatePartitioning-memory " + this.assumeDatePartitioning + ", \n   --props " + this.propsFilePath + ", \n   --hoodie-conf " + this.configs + "\n}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Config config = (Config)o;
            return this.basePath.equals(config.basePath) && Objects.equals(this.continuous, config.continuous) && Objects.equals(this.skipDataFilesForCleaning, config.skipDataFilesForCleaning) && Objects.equals(this.validateLatestFileSlices, config.validateLatestFileSlices) && Objects.equals(this.validateLatestBaseFiles, config.validateLatestBaseFiles) && Objects.equals(this.validateAllFileGroups, config.validateAllFileGroups) && Objects.equals(this.validateAllColumnStats, config.validateAllColumnStats) && Objects.equals(this.validateBloomFilters, config.validateBloomFilters) && Objects.equals(this.validateRecordIndexCount, config.validateRecordIndexCount) && Objects.equals(this.validateRecordIndexContent, config.validateRecordIndexContent) && Objects.equals(this.numRecordIndexErrorSamples, config.numRecordIndexErrorSamples) && Objects.equals(this.minValidateIntervalSeconds, config.minValidateIntervalSeconds) && Objects.equals(this.parallelism, config.parallelism) && Objects.equals(this.recordIndexParallelism, config.recordIndexParallelism) && Objects.equals(this.ignoreFailed, config.ignoreFailed) && Objects.equals(this.sparkMaster, config.sparkMaster) && Objects.equals(this.sparkMemory, config.sparkMemory) && Objects.equals(this.assumeDatePartitioning, config.assumeDatePartitioning) && Objects.equals(this.propsFilePath, config.propsFilePath) && Objects.equals(this.configs, config.configs);
        }

        public int hashCode() {
            return Objects.hash(this.basePath, this.continuous, this.skipDataFilesForCleaning, this.validateLatestFileSlices, this.validateLatestBaseFiles, this.validateAllFileGroups, this.validateAllColumnStats, this.validateBloomFilters, this.validateRecordIndexCount, this.validateRecordIndexContent, this.numRecordIndexErrorSamples, this.minValidateIntervalSeconds, this.parallelism, this.recordIndexParallelism, this.ignoreFailed, this.sparkMaster, this.sparkMemory, this.assumeDatePartitioning, this.propsFilePath, this.configs, this.help);
        }
    }
}

