/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.ConvertingGenericData;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Tuple3;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.MetadataRecordsGenerationParams;
import org.apache.hudi.org.apache.avro.AvroTypeException;
import org.apache.hudi.org.apache.avro.LogicalTypes;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.util.Lazy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieTableMetadataUtil {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieTableMetadataUtil.class);
    public static final String PARTITION_NAME_FILES = "files";
    public static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
    public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
    public static final String PARTITION_NAME_RECORD_INDEX = "record_index";
    private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10;
    private static final List<String> VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = Arrays.asList("010", "011", "012", "013");

    public static boolean isFilesPartitionAvailable(HoodieTableMetaClient metaClient) {
        return metaClient.getTableConfig().getMetadataPartitions().contains(PARTITION_NAME_FILES);
    }

    public static Map<String, HoodieColumnRangeMetadata<Comparable>> collectColumnRangeMetadata(List<IndexedRecord> records, List<Schema.Field> targetFields, String filePath) {
        class ColumnStats {
            Object minValue;
            Object maxValue;
            long nullCount;
            long valueCount;

            ColumnStats() {
            }
        }
        HashMap allColumnStats = new HashMap();
        records.forEach(record -> targetFields.forEach(field -> {
            ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(), ignored -> new ColumnStats());
            GenericRecord genericRecord = (GenericRecord)record;
            Object fieldVal = HoodieAvroUtils.convertValueForSpecificDataTypes(field.schema(), genericRecord.get(field.name()), false);
            Schema fieldSchema = HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name());
            ++colStats.valueCount;
            if (fieldVal != null && HoodieTableMetadataUtil.canCompare(fieldSchema)) {
                if (colStats.minValue == null || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.minValue, fieldSchema) < 0) {
                    colStats.minValue = fieldVal;
                }
                if (colStats.maxValue == null || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.maxValue, fieldSchema) > 0) {
                    colStats.maxValue = fieldVal;
                }
            } else {
                ++colStats.nullCount;
            }
        }));
        Collector collector = Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity());
        return targetFields.stream().map(field -> {
            ColumnStats colStats = (ColumnStats)allColumnStats.get(field.name());
            return HoodieColumnRangeMetadata.create(filePath, field.name(), colStats == null ? null : HoodieTableMetadataUtil.coerceToComparable(field.schema(), colStats.minValue), colStats == null ? null : HoodieTableMetadataUtil.coerceToComparable(field.schema(), colStats.maxValue), colStats == null ? 0L : colStats.nullCount, colStats == null ? 0L : colStats.valueCount, 0L, 0L);
        }).collect(collector);
    }

    public static HoodieColumnRangeMetadata<Comparable> convertColumnStatsRecordToColumnRangeMetadata(HoodieMetadataColumnStats columnStats) {
        return HoodieColumnRangeMetadata.create(columnStats.getFileName(), columnStats.getColumnName(), HoodieAvroUtils.unwrapAvroValueWrapper(columnStats.getMinValue()), HoodieAvroUtils.unwrapAvroValueWrapper(columnStats.getMaxValue()), columnStats.getNullCount(), columnStats.getValueCount(), columnStats.getTotalSize(), columnStats.getTotalUncompressedSize());
    }

    public static void deleteMetadataTable(String basePath, HoodieEngineContext context) {
        HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(context.getHadoopConf().get()).build();
        HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, context, false);
    }

    public static void deleteMetadataPartition(String basePath, HoodieEngineContext context, MetadataPartitionType partitionType) {
        HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(context.getHadoopConf().get()).build();
        HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, context, partitionType, false);
    }

    public static boolean metadataPartitionExists(String basePath, HoodieEngineContext context, MetadataPartitionType partitionType) {
        String metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
        FileSystem fs = FSUtils.getFs(metadataTablePath, context.getHadoopConf().get());
        try {
            return fs.exists(new Path(metadataTablePath, partitionType.getPartitionPath()));
        }
        catch (Exception e) {
            throw new HoodieIOException(String.format("Failed to check metadata partition %s exists.", partitionType.getPartitionPath()));
        }
    }

    public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext context, HoodieCommitMetadata commitMetadata, String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) {
        HashMap<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<MetadataPartitionType, HoodieData<HoodieRecord>>();
        HoodieData<HoodieRecord> filesPartitionRecordsRDD = context.parallelize(HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 1);
        partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD);
        if (recordsGenerationParams.getEnabledPartitionTypes().contains((Object)MetadataPartitionType.BLOOM_FILTERS)) {
            HoodieData<HoodieRecord> metadataBloomFilterRecords = HoodieTableMetadataUtil.convertMetadataToBloomFilterRecords(context, commitMetadata, instantTime, recordsGenerationParams);
            partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecords);
        }
        if (recordsGenerationParams.getEnabledPartitionTypes().contains((Object)MetadataPartitionType.COLUMN_STATS)) {
            HoodieData<HoodieRecord> metadataColumnStatsRDD = HoodieTableMetadataUtil.convertMetadataToColumnStatsRecords(commitMetadata, context, recordsGenerationParams);
            partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD);
        }
        return partitionToRecordsMap;
    }

    public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCommitMetadata commitMetadata, String instantTime) {
        ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>(commitMetadata.getPartitionToWriteStats().size());
        List<String> partitionsAdded = HoodieTableMetadataUtil.getPartitionsAdded(commitMetadata);
        records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));
        HoodieAtomicLongAccumulator newFileCount = HoodieAtomicLongAccumulator.create();
        List updatedPartitionFilesRecords = commitMetadata.getPartitionToWriteStats().entrySet().stream().map(entry -> {
            String partitionStatName = (String)entry.getKey();
            List writeStats = (List)entry.getValue();
            String partition = HoodieTableMetadataUtil.getPartitionIdentifier(partitionStatName);
            HashMap updatedFilesToSizesMapping = writeStats.stream().reduce(new HashMap(writeStats.size()), (map, stat) -> {
                String pathWithPartition = stat.getPath();
                if (pathWithPartition == null) {
                    LOG.warn("Unable to find path in write stat to update metadata table " + stat);
                    return map;
                }
                String fileName = FSUtils.getFileName(pathWithPartition, partitionStatName);
                map.merge(fileName, stat.getFileSizeInBytes(), Math::max);
                Map<String, Long> cdcPathAndSizes = stat.getCdcStats();
                if (cdcPathAndSizes != null && !cdcPathAndSizes.isEmpty()) {
                    map.putAll(cdcPathAndSizes);
                }
                return map;
            }, CollectionUtils::combine);
            newFileCount.add(updatedFilesToSizesMapping.size());
            return HoodieMetadataPayload.createPartitionFilesRecord(partition, updatedFilesToSizesMapping, Collections.emptyList());
        }).collect(Collectors.toList());
        records.addAll(updatedPartitionFilesRecords);
        LOG.info(String.format("Updating at %s from Commit/%s. #partitions_updated=%d, #files_added=%d", new Object[]{instantTime, commitMetadata.getOperationType(), records.size(), ((HoodieAccumulator)newFileCount).value()}));
        return records;
    }

    private static List<String> getPartitionsAdded(HoodieCommitMetadata commitMetadata) {
        return commitMetadata.getPartitionToWriteStats().keySet().stream().map(HoodieTableMetadataUtil::getPartitionIdentifier).collect(Collectors.toList());
    }

    public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieEngineContext context, HoodieCommitMetadata commitMetadata, String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) {
        List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(entry -> entry.stream()).collect(Collectors.toList());
        if (allWriteStats.isEmpty()) {
            return context.emptyHoodieData();
        }
        int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getBloomIndexParallelism()), 1);
        HoodieData<HoodieWriteStat> allWriteStatsRDD = context.parallelize(allWriteStats, parallelism);
        return allWriteStatsRDD.flatMap(hoodieWriteStat -> {
            String partition = hoodieWriteStat.getPartitionPath();
            if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
                return Collections.emptyListIterator();
            }
            String pathWithPartition = hoodieWriteStat.getPath();
            if (pathWithPartition == null) {
                LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat);
                return Collections.emptyListIterator();
            }
            String fileName = FSUtils.getFileName(pathWithPartition, partition);
            if (!FSUtils.isBaseFile(new Path(fileName))) {
                return Collections.emptyListIterator();
            }
            Path writeFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition);
            try {
                Throwable throwable = null;
                try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(), writeFilePath);){
                    BloomFilter fileBloomFilter;
                    block29: {
                        fileBloomFilter = fileReader.readBloomFilter();
                        if (fileBloomFilter != null) break block29;
                        LOG.error("Failed to read bloom filter for " + writeFilePath);
                        ListIterator listIterator = Collections.emptyListIterator();
                        fileReader.close();
                        return listIterator;
                    }
                    try {
                        ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
                        HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createBloomFilterMetadataRecord(partition, fileName, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
                        Iterator<HoodieRecord<HoodieMetadataPayload>> iterator2 = Collections.singletonList(record).iterator();
                        fileReader.close();
                        return iterator2;
                    }
                    catch (Exception e) {
                        ListIterator listIterator;
                        block30: {
                            block31: {
                                LOG.error("Failed to read bloom filter for " + writeFilePath);
                                listIterator = Collections.emptyListIterator();
                                fileReader.close();
                                if (fileReader == null) break block30;
                                if (throwable == null) break block31;
                                try {
                                    fileReader.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                                break block30;
                            }
                            fileReader.close();
                        }
                        return listIterator;
                        {
                            catch (Throwable throwable3) {
                                try {
                                    fileReader.close();
                                    throw throwable3;
                                }
                                catch (Throwable throwable4) {
                                    throwable = throwable4;
                                    throw throwable4;
                                }
                            }
                        }
                    }
                }
            }
            catch (IOException e) {
                LOG.error("Failed to get bloom filter for file: " + writeFilePath + ", write stat: " + hoodieWriteStat);
                return Collections.emptyListIterator();
            }
        });
    }

    public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext, HoodieCleanMetadata cleanMetadata, MetadataRecordsGenerationParams recordsGenerationParams, String instantTime) {
        HashMap<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<MetadataPartitionType, HoodieData<HoodieRecord>>();
        HoodieData<HoodieRecord> filesPartitionRecordsRDD = engineContext.parallelize(HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
        partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD);
        if (recordsGenerationParams.getEnabledPartitionTypes().contains((Object)MetadataPartitionType.BLOOM_FILTERS)) {
            HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD = HoodieTableMetadataUtil.convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, recordsGenerationParams);
            partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD);
        }
        if (recordsGenerationParams.getEnabledPartitionTypes().contains((Object)MetadataPartitionType.COLUMN_STATS)) {
            HoodieData<HoodieRecord> metadataColumnStatsRDD = HoodieTableMetadataUtil.convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, recordsGenerationParams);
            partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD);
        }
        return partitionToRecordsMap;
    }

    public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCleanMetadata cleanMetadata, String instantTime) {
        LinkedList<HoodieRecord> records = new LinkedList<HoodieRecord>();
        int[] fileDeleteCount = new int[]{0};
        ArrayList<String> deletedPartitions = new ArrayList<String>();
        cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> {
            String partition = HoodieTableMetadataUtil.getPartitionIdentifier(partitionName);
            List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
            HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Collections.emptyMap(), deletedFiles);
            records.add(record);
            fileDeleteCount[0] = fileDeleteCount[0] + deletedFiles.size();
            boolean isPartitionDeleted = partitionMetadata.getIsPartitionDeleted();
            if (isPartitionDeleted) {
                deletedPartitions.add((String)partitionName);
            }
        });
        if (!deletedPartitions.isEmpty()) {
            records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true));
        }
        LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() + ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size());
        return records;
    }

    public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMissingPartitionRecords(HoodieEngineContext engineContext, List<String> deletedPartitions, Map<String, Map<String, Long>> filesAdded, Map<String, List<String>> filesDeleted, String instantTime) {
        LinkedList<HoodieRecord<HoodieMetadataPayload>> records = new LinkedList<HoodieRecord<HoodieMetadataPayload>>();
        int[] fileDeleteCount = new int[]{0};
        int[] filesAddedCount = new int[]{0};
        filesAdded.forEach((partition, filesToAdd) -> {
            filesAddedCount[0] = filesAddedCount[0] + filesToAdd.size();
            List<String> filesToDelete = filesDeleted.getOrDefault(partition, Collections.emptyList());
            fileDeleteCount[0] = fileDeleteCount[0] + filesToDelete.size();
            HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesToAdd, filesToDelete);
            records.add(record);
        });
        filesDeleted.forEach((partition, filesToDelete) -> {
            if (!filesAdded.containsKey(partition)) {
                fileDeleteCount[0] = fileDeleteCount[0] + filesToDelete.size();
                HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Collections.emptyMap(), filesToDelete);
                records.add(record);
            }
        });
        if (!deletedPartitions.isEmpty()) {
            records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true));
        }
        LOG.info("Re-adding missing records at " + instantTime + " during Restore. #partitions_updated=" + records.size() + ", #files_added=" + filesAddedCount[0] + ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size());
        return Collections.singletonMap(MetadataPartitionType.FILES, engineContext.parallelize(records, 1));
    }

    public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata, HoodieEngineContext engineContext, String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) {
        ArrayList deleteFileList = new ArrayList();
        cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
            List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
            deletedFiles.forEach(entry -> {
                Path deletedFilePath = new Path(entry);
                if (FSUtils.isBaseFile(deletedFilePath)) {
                    deleteFileList.add(Pair.of(partition, deletedFilePath.getName()));
                }
            });
        });
        int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1);
        HoodieData deleteFileListRDD = engineContext.parallelize(deleteFileList, parallelism);
        return deleteFileListRDD.map(deleteFileInfoPair -> HoodieMetadataPayload.createBloomFilterMetadataRecord((String)deleteFileInfoPair.getLeft(), (String)deleteFileInfoPair.getRight(), instantTime, "", ByteBuffer.allocate(0), true));
    }

    public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata, HoodieEngineContext engineContext, MetadataRecordsGenerationParams recordsGenerationParams) {
        ArrayList deleteFileList = new ArrayList();
        cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
            List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
            deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry)));
        });
        HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
        List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(recordsGenerationParams, Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)));
        if (columnsToIndex.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
        return engineContext.parallelize(deleteFileList, parallelism).flatMap(deleteFileInfoPair -> {
            String partitionPath = (String)deleteFileInfoPair.getLeft();
            String filePath = (String)deleteFileInfoPair.getRight();
            if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
                return HoodieTableMetadataUtil.getColumnStatsRecords(partitionPath, filePath, dataTableMetaClient, columnsToIndex, true).iterator();
            }
            return Collections.emptyListIterator();
        });
    }

    public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext, HoodieTableMetaClient dataTableMetaClient, HoodieRollbackMetadata rollbackMetadata, String instantTime) {
        List<HoodieRecord> filesPartitionRecords = HoodieTableMetadataUtil.convertMetadataToRollbackRecords(rollbackMetadata, instantTime, dataTableMetaClient);
        HoodieData rollbackRecordsRDD = filesPartitionRecords.isEmpty() ? engineContext.emptyHoodieData() : engineContext.parallelize(filesPartitionRecords, filesPartitionRecords.size());
        return Collections.singletonMap(MetadataPartitionType.FILES, rollbackRecordsRDD);
    }

    private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTableMetaClient, String instantTime, Map<String, Map<String, Long>> partitionToFilesMap) {
        HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "rollback", instantTime);
        HoodieInstant requested = HoodieTimeline.getRollbackRequestedInstant(rollbackInstant);
        try {
            HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata(dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(), HoodieRollbackPlan.class);
            rollbackPlan.getRollbackRequests().forEach(rollbackRequest -> {
                String partitionId = HoodieTableMetadataUtil.getPartitionIdentifier(rollbackRequest.getPartitionPath());
                partitionToFilesMap.computeIfAbsent(partitionId, s -> new HashMap());
                if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
                    HashMap logFiles = new HashMap();
                    rollbackRequest.getLogBlocksToBeDeleted().forEach((k, v) -> {
                        String fileName = k.substring(k.lastIndexOf("/") + 1);
                        logFiles.put(fileName, 1L);
                    });
                    ((Map)partitionToFilesMap.get(partitionId)).putAll(logFiles);
                }
            });
        }
        catch (IOException e) {
            throw new HoodieMetadataException("Parsing rollback plan for " + rollbackInstant.toString() + " failed ");
        }
    }

    private static List<HoodieRecord> convertMetadataToRollbackRecords(HoodieRollbackMetadata rollbackMetadata, String instantTime, HoodieTableMetaClient dataTableMetaClient) {
        HashMap<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<String, Map<String, Long>>();
        HoodieTableMetadataUtil.processRollbackMetadata(rollbackMetadata, partitionToAppendedFiles);
        HoodieTableMetadataUtil.reAddLogFilesFromRollbackPlan(dataTableMetaClient, instantTime, partitionToAppendedFiles);
        return HoodieTableMetadataUtil.convertFilesToFilesPartitionRecords(Collections.emptyMap(), partitionToAppendedFiles, instantTime, "Rollback");
    }

    private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, Map<String, Map<String, Long>> partitionToAppendedFiles) {
        rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
            boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty();
            String partition = pm.getPartitionPath();
            String partitionId = HoodieTableMetadataUtil.getPartitionIdentifier(partition);
            BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> oldSize > newSizeCopy ? oldSize : newSizeCopy;
            if (hasRollbackLogFiles) {
                if (!partitionToAppendedFiles.containsKey(partitionId)) {
                    partitionToAppendedFiles.put(partitionId, new HashMap());
                }
                pm.getRollbackLogFiles().forEach((path, size) -> {
                    String fileName = new Path(path).getName();
                    ((Map)partitionToAppendedFiles.get(partitionId)).merge(fileName, size, fileMergeFn);
                });
            }
        });
    }

    protected static List<HoodieRecord> convertFilesToFilesPartitionRecords(Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime, String operation) {
        ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>(partitionToDeletedFiles.size() + partitionToAppendedFiles.size());
        int[] fileChangeCount = new int[]{0, 0};
        partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
            fileChangeCount[0] = fileChangeCount[0] + deletedFiles.size();
            String partition = HoodieTableMetadataUtil.getPartitionIdentifier(partitionName);
            Map filesAdded = Collections.emptyMap();
            if (partitionToAppendedFiles.containsKey(partitionName)) {
                filesAdded = (Map)partitionToAppendedFiles.remove(partitionName);
            }
            HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded, deletedFiles);
            records.add(record);
        });
        partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
            String partition = HoodieTableMetadataUtil.getPartitionIdentifier(partitionName);
            fileChangeCount[1] = fileChangeCount[1] + appendedFileMap.size();
            ValidationUtils.checkState(!appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), "Rollback file cannot both be appended and deleted");
            HoodieRecord<HoodieMetadataPayload> record = HoodieMetadataPayload.createPartitionFilesRecord(partition, appendedFileMap, Collections.emptyList());
            records.add(record);
        });
        LOG.info("Found at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size() + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]);
        return records;
    }

    public static String getPartitionIdentifier(@Nonnull String relativePartitionPath) {
        return "".equals(relativePartitionPath) ? "." : relativePartitionPath;
    }

    public static HoodieData<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext, Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles, MetadataRecordsGenerationParams recordsGenerationParams, String instantTime) {
        List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = HoodieTableMetadataUtil.fetchPartitionFileInfoTriplets(partitionToDeletedFiles, partitionToAppendedFiles);
        int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1);
        return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> {
            String partitionName = (String)partitionFileFlagTuple.f0;
            String filename = (String)partitionFileFlagTuple.f1;
            boolean isDeleted = (Boolean)partitionFileFlagTuple.f2;
            if (!FSUtils.isBaseFile(new Path(filename))) {
                LOG.warn(String.format("Ignoring file %s as it is not a base file", filename));
                return Stream.empty().iterator();
            }
            ByteBuffer bloomFilterBuffer = ByteBuffer.allocate(0);
            if (!isDeleted) {
                String pathWithPartition = partitionName + "/" + filename;
                Path addedFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition);
                bloomFilterBuffer = HoodieTableMetadataUtil.readBloomFilter(recordsGenerationParams.getDataMetaClient().getHadoopConf(), addedFilePath);
                if (bloomFilterBuffer == null) {
                    LOG.error("Failed to read bloom filter from " + addedFilePath);
                    return Stream.empty().iterator();
                }
            }
            String partition = HoodieTableMetadataUtil.getPartitionIdentifier(partitionName);
            return Stream.of(HoodieMetadataPayload.createBloomFilterMetadataRecord(partition, filename, instantTime, recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer, (Boolean)partitionFileFlagTuple.f2)).iterator();
        });
    }

    public static HoodieData<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext, Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles, MetadataRecordsGenerationParams recordsGenerationParams) {
        HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
        List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(recordsGenerationParams, Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)));
        if (columnsToIndex.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        LOG.info(String.format("Indexing %d columns for column stats index", columnsToIndex.size()));
        List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = HoodieTableMetadataUtil.fetchPartitionFileInfoTriplets(partitionToDeletedFiles, partitionToAppendedFiles);
        int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
        return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> {
            String partitionName = (String)partitionFileFlagTuple.f0;
            String filename = (String)partitionFileFlagTuple.f1;
            boolean isDeleted = (Boolean)partitionFileFlagTuple.f2;
            if (!FSUtils.isBaseFile(new Path(filename)) || !filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
                LOG.warn(String.format("Ignoring file %s as it is not a PARQUET file", filename));
                return Stream.empty().iterator();
            }
            String filePathWithPartition = partitionName + "/" + filename;
            String partitionId = HoodieTableMetadataUtil.getPartitionIdentifier(partitionName);
            return HoodieTableMetadataUtil.getColumnStatsRecords(partitionId, filePathWithPartition, dataTableMetaClient, columnsToIndex, isDeleted).iterator();
        });
    }

    private static ByteBuffer readBloomFilter(Configuration conf, Path filePath) throws IOException {
        try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(conf, filePath);){
            BloomFilter fileBloomFilter = fileReader.readBloomFilter();
            if (fileBloomFilter == null) {
                ByteBuffer byteBuffer = null;
                return byteBuffer;
            }
            ByteBuffer byteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
            return byteBuffer;
        }
    }

    private static List<Tuple3<String, String, Boolean>> fetchPartitionFileInfoTriplets(Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles) {
        int totalFiles = partitionToDeletedFiles.values().stream().mapToInt(List::size).sum() + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
        ArrayList<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = new ArrayList<Tuple3<String, String, Boolean>>(totalFiles);
        partitionToDeletedFiles.entrySet().stream().flatMap(entry -> ((List)entry.getValue()).stream().map(deletedFile -> Tuple3.of(entry.getKey(), deletedFile, true))).collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
        partitionToAppendedFiles.entrySet().stream().flatMap(entry -> ((Map)entry.getValue()).keySet().stream().map(addedFile -> Tuple3.of(entry.getKey(), addedFile, false))).collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
        return partitionFileFlagTupleList;
    }

    public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGroups) {
        int h = 0;
        for (int i = 0; i < recordKey.length(); ++i) {
            h = 31 * h + recordKey.charAt(i);
        }
        return Math.abs(Math.abs(h) % numFileGroups);
    }

    public static List<FileSlice> getPartitionLatestMergedFileSlices(HoodieTableMetaClient metaClient, HoodieTableFileSystemView fsView, String partition) {
        LOG.info("Loading latest merged file slices for metadata table partition " + partition);
        return HoodieTableMetadataUtil.getPartitionFileSlices(metaClient, Option.of(fsView), partition, true);
    }

    public static List<FileSlice> getPartitionLatestFileSlices(HoodieTableMetaClient metaClient, Option<HoodieTableFileSystemView> fsView, String partition) {
        LOG.info("Loading latest file slices for metadata table partition " + partition);
        return HoodieTableMetadataUtil.getPartitionFileSlices(metaClient, fsView, partition, false);
    }

    public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient) {
        HoodieDefaultTimeline timeline = metaClient.getActiveTimeline();
        if (timeline.empty()) {
            HoodieInstant instant = new HoodieInstant(false, "deltacommit", HoodieActiveTimeline.createNewInstantTime());
            timeline = new HoodieDefaultTimeline(Stream.of(instant), metaClient.getActiveTimeline()::getInstantDetails);
        }
        return new HoodieTableFileSystemView(metaClient, timeline);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient metaClient, Option<HoodieTableFileSystemView> fileSystemView, String partition, boolean mergeFileSlices) {
        Stream<FileSlice> fileSliceStream;
        HoodieTableFileSystemView fsView = fileSystemView.orElse(HoodieTableMetadataUtil.getFileSystemView(metaClient));
        if (mergeFileSlices) {
            if (!metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) return Collections.emptyList();
            fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
            return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
        } else {
            fileSliceStream = fsView.getLatestFileSlices(partition);
        }
        return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
    }

    public static List<FileSlice> getPartitionLatestFileSlicesIncludingInflight(HoodieTableMetaClient metaClient, Option<HoodieTableFileSystemView> fileSystemView, String partition) {
        HoodieTableFileSystemView fsView = fileSystemView.orElse(HoodieTableMetadataUtil.getFileSystemView(metaClient));
        Stream<FileSlice> fileSliceStream = fsView.fetchLatestFileSlicesIncludingInflight(partition);
        return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
    }

    public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, HoodieEngineContext engineContext, MetadataRecordsGenerationParams recordsGenerationParams) {
        List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
        if (allWriteStats.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        try {
            Option<Schema> writerSchema = Option.ofNullable(commitMetadata.getMetadata("schema")).flatMap(writerSchemaStr -> StringUtils.isNullOrEmpty(writerSchemaStr) ? Option.empty() : Option.of(new Schema.Parser().parse((String)writerSchemaStr)));
            HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
            HoodieTableConfig tableConfig = dataTableMetaClient.getTableConfig();
            Option<Schema> tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? HoodieAvroUtils.addMetadataFields(schema) : schema);
            List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(recordsGenerationParams, Lazy.eagerly(tableSchema));
            if (columnsToIndex.isEmpty()) {
                return engineContext.emptyHoodieData();
            }
            int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
            return engineContext.parallelize(allWriteStats, parallelism).flatMap(writeStat -> HoodieTableMetadataUtil.translateWriteStatToColumnStats(writeStat, dataTableMetaClient, columnsToIndex).iterator());
        }
        catch (Exception e) {
            throw new HoodieException("Failed to generate column stats records for metadata table", e);
        }
    }

    private static List<String> getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams, Lazy<Option<Schema>> lazyWriterSchemaOpt) {
        ValidationUtils.checkState(recordsGenParams.isColumnStatsIndexEnabled());
        List<String> targetColumns = recordsGenParams.getTargetColumnsForColumnStatsIndex();
        if (!targetColumns.isEmpty()) {
            return targetColumns;
        }
        Option<Schema> writerSchemaOpt = lazyWriterSchemaOpt.get();
        return writerSchemaOpt.map(writerSchema -> writerSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList())).orElse(Collections.emptyList());
    }

    private static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex) {
        if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat)writeStat).getColumnStats().isPresent()) {
            Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = ((HoodieDeltaWriteStat)writeStat).getColumnStats().get();
            Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnRangeMap.values();
            return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
        }
        return HoodieTableMetadataUtil.getColumnStatsRecords(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
    }

    private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath, String filePath, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex, boolean isDeleted) {
        String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath;
        String fileName = FSUtils.getFileName(filePath, partitionPath);
        if (isDeleted) {
            List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry)).collect(Collectors.toList());
            return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, true);
        }
        List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata = HoodieTableMetadataUtil.readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, columnsToIndex);
        return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadata, false);
    }

    private static List<HoodieColumnRangeMetadata<Comparable>> readColumnRangeMetadataFrom(String filePath, HoodieTableMetaClient datasetMetaClient, List<String> columnsToIndex) {
        try {
            if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
                Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePath);
                List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex);
                return columnRangeMetadataList;
            }
            LOG.warn("Column range index not supported for: " + filePath);
            return Collections.emptyList();
        }
        catch (Exception e) {
            LOG.error("Failed to fetch column range metadata for: " + filePath);
            return Collections.emptyList();
        }
    }

    public static BigDecimal tryUpcastDecimal(BigDecimal value, LogicalTypes.Decimal decimal) {
        int scale = decimal.getScale();
        int valueScale = value.scale();
        boolean scaleAdjusted = false;
        if (valueScale != scale) {
            try {
                value = value.setScale(scale, RoundingMode.UNNECESSARY);
                scaleAdjusted = true;
            }
            catch (ArithmeticException aex) {
                throw new AvroTypeException("Cannot encode decimal with scale " + valueScale + " as scale " + scale + " without rounding");
            }
        }
        int precision = decimal.getPrecision();
        int valuePrecision = value.precision();
        if (valuePrecision > precision) {
            if (scaleAdjusted) {
                throw new AvroTypeException("Cannot encode decimal with precision " + valuePrecision + " as max precision " + precision + ". This is after safely adjusting scale from " + valueScale + " to required " + scale);
            }
            throw new AvroTypeException("Cannot encode decimal with precision " + valuePrecision + " as max precision " + precision);
        }
        return value;
    }

    private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) {
        if (dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
            return Option.empty();
        }
        try {
            TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient);
            return Option.of(schemaResolver.getTableAvroSchema());
        }
        catch (Exception e) {
            throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e);
        }
    }

    private static Comparable<?> coerceToComparable(Schema schema, Object val) {
        if (val == null) {
            return null;
        }
        switch (schema.getType()) {
            case UNION: {
                return HoodieTableMetadataUtil.coerceToComparable(AvroSchemaUtils.resolveNullableSchema(schema), val);
            }
            case FIXED: 
            case BYTES: {
                if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
                    return (Comparable)val;
                }
                return (ByteBuffer)val;
            }
            case INT: {
                if (schema.getLogicalType() == LogicalTypes.date() || schema.getLogicalType() == LogicalTypes.timeMillis()) {
                    return (Comparable)val;
                }
                return (Integer)val;
            }
            case LONG: {
                if (schema.getLogicalType() == LogicalTypes.timeMicros() || schema.getLogicalType() == LogicalTypes.timestampMicros() || schema.getLogicalType() == LogicalTypes.timestampMillis()) {
                    return (Comparable)val;
                }
                return (Long)val;
            }
            case STRING: {
                return val.toString();
            }
            case FLOAT: 
            case DOUBLE: 
            case BOOLEAN: {
                return (Comparable)val;
            }
            case ENUM: 
            case MAP: 
            case NULL: 
            case RECORD: 
            case ARRAY: {
                return null;
            }
        }
        throw new IllegalStateException("Unexpected type: " + (Object)((Object)schema.getType()));
    }

    private static boolean canCompare(Schema schema) {
        return schema.getType() != Schema.Type.MAP;
    }

    public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableConfig) {
        return new HashSet<String>(tableConfig.getMetadataPartitionsInflight());
    }

    public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableConfig tableConfig) {
        Set<String> inflightAndCompletedPartitions = HoodieTableMetadataUtil.getInflightMetadataPartitions(tableConfig);
        inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
        return inflightAndCompletedPartitions;
    }

    public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient dataMetaClient, HoodieTableMetaClient metadataMetaClient) {
        HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline();
        Set<String> validInstantTimestamps = datasetTimeline.filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
        validInstantTimestamps.addAll(metadataMetaClient.getActiveTimeline().filter(instant -> instant.isCompleted() && HoodieTableMetadataUtil.isValidInstant(instant)).getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
        String earliestInstantTime = validInstantTimestamps.isEmpty() ? "00000000000000" : (String)Collections.min(validInstantTimestamps);
        datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, earliestInstantTime)).forEach(instant -> validInstantTimestamps.addAll(HoodieTableMetadataUtil.getRollbackedCommits(instant, datasetTimeline)));
        metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants().filter(instant -> instant.getAction().equals("restore") || instant.getAction().equals("rollback")).getInstants().forEach(instant -> validInstantTimestamps.add(instant.getTimestamp()));
        metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().filter(instant -> instant.getTimestamp().startsWith("00000000000000")).getInstants().forEach(instant -> validInstantTimestamps.add(instant.getTimestamp()));
        return validInstantTimestamps;
    }

    public static boolean isValidInstant(HoodieInstant instant) {
        if (!instant.getAction().equals("deltacommit")) {
            return false;
        }
        String instantTime = instant.getTimestamp();
        if (instantTime.length() != HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH + OperationSuffix.METADATA_INDEXER.getSuffix().length()) {
            return false;
        }
        String suffix = instantTime.substring(instantTime.length() - 3);
        if (OperationSuffix.isValidSuffix(suffix)) {
            return true;
        }
        return suffix.compareTo(String.format("%03d", 10)) >= 0;
    }

    public static boolean isIndexingCommit(String instantTime) {
        return instantTime.length() == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH + OperationSuffix.METADATA_INDEXER.getSuffix().length() && instantTime.endsWith(OperationSuffix.METADATA_INDEXER.getSuffix());
    }

    private static List<String> getRollbackedCommits(HoodieInstant instant, HoodieActiveTimeline timeline) {
        try {
            if (instant.getAction().equals("rollback")) {
                List<String> commitsToRollback;
                try {
                    HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(instant).get());
                    commitsToRollback = rollbackMetadata.getCommitsRollback();
                }
                catch (IOException e) {
                    HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata(timeline.readRollbackInfoAsBytes(new HoodieInstant(HoodieInstant.State.REQUESTED, "rollback", instant.getTimestamp())).get(), HoodieRollbackPlan.class);
                    commitsToRollback = Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime());
                    LOG.warn("Had to fetch rollback info from requested instant since completed file is empty " + instant.toString());
                }
                return commitsToRollback;
            }
            LinkedList<String> rollbackedCommits = new LinkedList<String>();
            if (instant.getAction().equals("restore")) {
                HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(timeline.getInstantDetails(instant).get());
                restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback())));
            }
            return rollbackedCommits;
        }
        catch (IOException e) {
            throw new HoodieMetadataException("Error retrieving rollback commits for instant " + instant, e);
        }
    }

    public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, boolean backup) {
        Path metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2());
        FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), context.getHadoopConf().get());
        dataMetaClient.getTableConfig().clearMetadataPartitions(dataMetaClient);
        try {
            if (!fs.exists(metadataTablePath)) {
                return null;
            }
        }
        catch (FileNotFoundException e) {
            return null;
        }
        catch (IOException e) {
            throw new HoodieMetadataException("Failed to check metadata table existence", e);
        }
        if (backup) {
            Path metadataBackupPath = new Path(metadataTablePath.getParent(), ".metadata_" + HoodieActiveTimeline.createNewInstantTime());
            LOG.info("Backing up metadata directory to " + metadataBackupPath + " before deletion");
            try {
                if (fs.rename(metadataTablePath, metadataBackupPath)) {
                    return metadataBackupPath.toString();
                }
            }
            catch (Exception e) {
                LOG.error("Failed to backup metadata table using rename", (Throwable)e);
            }
        }
        LOG.info("Deleting metadata table from " + metadataTablePath);
        try {
            fs.delete(metadataTablePath, true);
        }
        catch (Exception e) {
            throw new HoodieMetadataException("Failed to delete metadata table from path " + metadataTablePath, e);
        }
        return null;
    }

    public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, MetadataPartitionType partitionType, boolean backup) {
        if (partitionType.equals((Object)MetadataPartitionType.FILES)) {
            return HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, context, backup);
        }
        Path metadataTablePartitionPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()), partitionType.getPartitionPath());
        FileSystem fs = FSUtils.getFs(metadataTablePartitionPath.toString(), context.getHadoopConf().get());
        dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, false);
        try {
            if (!fs.exists(metadataTablePartitionPath)) {
                return null;
            }
        }
        catch (FileNotFoundException e) {
            LOG.debug("Metadata table partition " + (Object)((Object)partitionType) + " not found at path " + metadataTablePartitionPath);
            return null;
        }
        catch (Exception e) {
            throw new HoodieMetadataException(String.format("Failed to check existence of MDT partition %s at path %s: ", new Object[]{partitionType, metadataTablePartitionPath}), e);
        }
        if (backup) {
            Path metadataPartitionBackupPath = new Path(metadataTablePartitionPath.getParent().getParent(), String.format(".metadata_%s_%s", partitionType.getPartitionPath(), HoodieActiveTimeline.createNewInstantTime()));
            LOG.info(String.format("Backing up MDT partition %s to %s before deletion", new Object[]{partitionType, metadataPartitionBackupPath}));
            try {
                if (fs.rename(metadataTablePartitionPath, metadataPartitionBackupPath)) {
                    return metadataPartitionBackupPath.toString();
                }
            }
            catch (Exception e) {
                LOG.error(String.format("Failed to backup MDT partition %s using rename", new Object[]{partitionType}), (Throwable)e);
            }
        } else {
            LOG.info("Deleting metadata table partition from " + metadataTablePartitionPath);
            try {
                fs.delete(metadataTablePartitionPath, true);
            }
            catch (Exception e) {
                throw new HoodieMetadataException("Failed to delete metadata table partition from path " + metadataTablePartitionPath, e);
            }
        }
        return null;
    }

    public static String getFileIDForFileGroup(MetadataPartitionType partitionType, int index) {
        return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), index, 0);
    }

    public static int getFileGroupIndexFromFileId(String fileId) {
        int endIndex = HoodieTableMetadataUtil.getFileIdLengthWithoutFileIndex(fileId);
        int fromIndex = fileId.lastIndexOf("-", endIndex - 1);
        return Integer.parseInt(fileId.substring(fromIndex + 1, endIndex));
    }

    public static String getFileGroupPrefix(String fileId) {
        return fileId.substring(0, HoodieTableMetadataUtil.getFileIdLengthWithoutFileIndex(fileId));
    }

    private static int getFileIdLengthWithoutFileIndex(String fileId) {
        return fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length();
    }

    public static String createCleanTimestamp(String timestamp) {
        return timestamp + OperationSuffix.CLEAN.getSuffix();
    }

    public static String createRollbackTimestamp(String timestamp) {
        return timestamp + OperationSuffix.ROLLBACK.getSuffix();
    }

    public static String createRestoreTimestamp(String timestamp) {
        return timestamp + OperationSuffix.RESTORE.getSuffix();
    }

    public static String createAsyncIndexerTimestamp(String timestamp) {
        return timestamp + OperationSuffix.METADATA_INDEXER.getSuffix();
    }

    public static String createCompactionTimestamp(String timestamp) {
        return timestamp + OperationSuffix.COMPACTION.getSuffix();
    }

    public static String createIndexInitTimestamp(String timestamp, int offset) {
        return String.format("%s%03d", timestamp, 10 + offset);
    }

    public static String createLogCompactionTimestamp(String timestamp) {
        return timestamp + OperationSuffix.LOG_COMPACTION.getSuffix();
    }

    public static int estimateFileGroupCount(MetadataPartitionType partitionType, long recordCount, int averageRecordSize, int minFileGroupCount, int maxFileGroupCount, float growthFactor, int maxFileGroupSizeBytes) {
        long maxRecordsPerFileGroup;
        long expectedNumRecords;
        long estimatedFileGroupCount;
        int fileGroupCount = minFileGroupCount == maxFileGroupCount && minFileGroupCount != 0 ? minFileGroupCount : ((estimatedFileGroupCount = (expectedNumRecords = (long)Math.ceil((float)recordCount * growthFactor)) / (maxRecordsPerFileGroup = (long)maxFileGroupSizeBytes / Math.max((long)averageRecordSize, 1L))) >= (long)maxFileGroupCount ? maxFileGroupCount : (estimatedFileGroupCount <= (long)minFileGroupCount ? minFileGroupCount : Math.max(1, (int)estimatedFileGroupCount)));
        LOG.info(String.format("Estimated file group count for MDT partition %s is %d [recordCount=%d, avgRecordSize=%d, minFileGroupCount=%d, maxFileGroupCount=%d, growthFactor=%f, maxFileGroupSizeBytes=%d]", partitionType.name(), fileGroupCount, recordCount, averageRecordSize, minFileGroupCount, maxFileGroupCount, Float.valueOf(growthFactor), maxFileGroupSizeBytes));
        return fileGroupCount;
    }

    public static boolean getMetadataPartitionsNeedingWriteStatusTracking(HoodieMetadataConfig config, HoodieTableMetaClient metaClient) {
        if (MetadataPartitionType.getMetadataPartitionsNeedingWriteStatusTracking().stream().anyMatch(p -> metaClient.getTableConfig().isMetadataPartitionAvailable((MetadataPartitionType)((Object)p)))) {
            return true;
        }
        Set<String> metadataPartitionsInflight = metaClient.getTableConfig().getMetadataPartitionsInflight();
        if (MetadataPartitionType.getMetadataPartitionsNeedingWriteStatusTracking().stream().anyMatch(p -> metadataPartitionsInflight.contains(p.getPartitionPath()))) {
            return true;
        }
        return config.enableRecordIndex();
    }

    public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo(HoodieRecordIndexInfo recordIndexInfo) {
        return HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(recordIndexInfo.getPartitionName(), recordIndexInfo.getFileIdEncoding(), recordIndexInfo.getFileIdHighBits(), recordIndexInfo.getFileIdLowBits(), recordIndexInfo.getFileIndex(), recordIndexInfo.getFileId(), recordIndexInfo.getInstantTime());
    }

    public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo(String partition, int fileIdEncoding, long fileIdHighBits, long fileIdLowBits, int fileIndex, String originalFileId, Long instantTime) {
        String fileId = null;
        if (fileIdEncoding == 0) {
            UUID uuid = new UUID(fileIdHighBits, fileIdLowBits);
            fileId = uuid.toString();
            if (fileIndex != -1) {
                fileId = fileId + "-" + fileIndex;
            }
        } else {
            fileId = originalFileId;
        }
        Date instantDate = new Date(instantTime);
        return new HoodieRecordGlobalLocation(partition, HoodieActiveTimeline.formatDate(instantDate), fileId);
    }

    private static enum OperationSuffix {
        COMPACTION("001"),
        CLEAN("002"),
        RESTORE("003"),
        METADATA_INDEXER("004"),
        LOG_COMPACTION("005"),
        ROLLBACK("006");

        static final Set<String> ALL_SUFFIXES;
        private final String suffix;

        private OperationSuffix(String suffix) {
            this.suffix = suffix;
        }

        String getSuffix() {
            return this.suffix;
        }

        static boolean isValidSuffix(String suffix) {
            return ALL_SUFFIXES.contains(suffix);
        }

        static {
            ALL_SUFFIXES = Arrays.stream(OperationSuffix.values()).map(o -> o.getSuffix()).collect(Collectors.toSet());
        }
    }
}

