/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metadata.BucketizedMetadataTableFileGroupIndexParser;
import org.apache.hudi.metadata.DefaultMetadataTableFileGroupIndexParser;
import org.apache.hudi.metadata.EmptyHoodieRecordPayloadWithPartition;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieIndexVersion;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataIndexMapper;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.MetadataTableFileGroupIndexParser;
import org.apache.hudi.metadata.RecordIndexMapper;
import org.apache.hudi.metadata.SecondaryIndexMapper;
import org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.util.Lazy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTableMetadataWriter<I, O> {
    static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
    private static final String RECORD_KEY_FIELD_NAME = "key";
    private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
    private transient BaseHoodieWriteClient<?, I, ?, O> writeClient;
    protected HoodieWriteConfig metadataWriteConfig;
    protected HoodieWriteConfig dataWriteConfig;
    protected HoodieBackedTableMetadata metadata;
    protected HoodieTableMetaClient metadataMetaClient;
    protected HoodieTableMetaClient dataMetaClient;
    protected Option<HoodieMetadataMetrics> metrics;
    protected StorageConfiguration<?> storageConf;
    protected final transient HoodieEngineContext engineContext;
    protected final List<MetadataPartitionType> enabledPartitionTypes;
    boolean initialized = false;
    private HoodieTableFileSystemView metadataView;
    private final boolean streamingWritesEnabled;

    protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<String> inflightInstantTimestamp) {
        this(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp, false);
    }

    protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<String> inflightInstantTimestamp, boolean streamingWritesEnabled) {
        this.dataWriteConfig = writeConfig;
        this.engineContext = engineContext;
        this.storageConf = storageConf;
        this.metrics = Option.empty();
        this.dataMetaClient = HoodieTableMetaClient.builder().setConf(storageConf.newInstance()).setBasePath(this.dataWriteConfig.getBasePath()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
        this.enabledPartitionTypes = this.getEnabledPartitions(this.dataWriteConfig.getMetadataConfig(), this.dataMetaClient);
        if (writeConfig.isMetadataTableEnabled()) {
            this.metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy, this.dataMetaClient.getTableConfig().getTableVersion());
            try {
                this.initRegistry();
                this.initialized = this.initializeIfNeeded(this.dataMetaClient, inflightInstantTimestamp);
            }
            catch (IOException e) {
                LOG.error("Failed to initialize metadata table", (Throwable)e);
            }
        }
        ValidationUtils.checkArgument((!this.initialized || this.metadata != null ? 1 : 0) != 0, (String)"MDT Reader should have been opened post initialization");
        this.streamingWritesEnabled = streamingWritesEnabled;
    }

    List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig metadataConfig, HoodieTableMetaClient metaClient) {
        return MetadataPartitionType.getEnabledPartitions((HoodieMetadataConfig)metadataConfig, (HoodieTableMetaClient)metaClient);
    }

    private void mayBeReinitMetadataReader() {
        if (this.metadata == null || this.metadataMetaClient == null || this.metadata.getMetadataFileSystemView() == null) {
            this.initMetadataReader();
        }
    }

    private void initMetadataReader() {
        if (this.metadata != null) {
            this.metadata.close();
        }
        try {
            this.metadata = new HoodieBackedTableMetadata(this.engineContext, this.dataMetaClient.getStorage(), this.dataWriteConfig.getMetadataConfig(), this.dataWriteConfig.getBasePath(), true);
            this.metadataMetaClient = this.metadata.getMetadataMetaClient();
        }
        catch (Exception e) {
            throw new HoodieException("Could not open MDT for reads", (Throwable)e);
        }
    }

    private HoodieTableFileSystemView getMetadataView() {
        if (this.metadataView == null || !this.metadataView.equals(this.metadata.getMetadataFileSystemView())) {
            ValidationUtils.checkState((this.metadata != null ? 1 : 0) != 0, (String)"Metadata table not initialized");
            ValidationUtils.checkState((this.dataMetaClient != null ? 1 : 0) != 0, (String)"Data table meta client not initialized");
            this.metadataView = new HoodieTableFileSystemView((HoodieTableMetadata)this.metadata, this.dataMetaClient, (HoodieTimeline)this.dataMetaClient.getActiveTimeline());
        }
        return this.metadataView;
    }

    protected abstract void initRegistry();

    public HoodieWriteConfig getWriteConfig() {
        return this.metadataWriteConfig;
    }

    public HoodieBackedTableMetadata getTableMetadata() {
        return this.metadata;
    }

    public List<MetadataPartitionType> getEnabledPartitionTypes() {
        return this.enabledPartitionTypes;
    }

    protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) throws IOException {
        HoodieTimer timer = HoodieTimer.start();
        ArrayList<MetadataPartitionType> metadataPartitionsToInit = new ArrayList<MetadataPartitionType>(MetadataPartitionType.getValidValues().length);
        try {
            boolean exists = this.metadataTableExists(dataMetaClient);
            if (!exists) {
                metadataPartitionsToInit.add(MetadataPartitionType.FILES);
            }
            if (!this.dataWriteConfig.isMetadataAsyncIndex()) {
                Set completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
                LOG.info("Async metadata indexing disabled and following partitions already initialized: {}", (Object)completedPartitions);
                this.enabledPartitionTypes.stream().filter(p -> !completedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)).forEach(metadataPartitionsToInit::add);
            }
            if (metadataPartitionsToInit.isEmpty()) {
                this.initMetadataReader();
                return true;
            }
            String dataTableInstantTime = (String)dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse((Object)"00000000000000");
            if (!this.initializeFromFilesystem(dataTableInstantTime, metadataPartitionsToInit, inflightInstantTimestamp)) {
                LOG.error("Failed to initialize MDT from filesystem");
                return false;
            }
            this.metrics.ifPresent(m -> m.updateMetrics("initialize", timer.endTimer()));
            return true;
        }
        catch (IOException | HoodieIOException e) {
            LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
            return false;
        }
    }

    private boolean metadataTableExists(HoodieTableMetaClient dataMetaClient) throws IOException {
        boolean exists = dataMetaClient.getTableConfig().isMetadataTableAvailable();
        boolean reInitialize = false;
        if (exists) {
            try {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.storageConf.newInstance()).setBasePath(this.metadataWriteConfig.getBasePath()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
                if (this.metadataMetaClient.getTableConfig().populateMetaFields()) {
                    LOG.info("Re-initiating metadata table properties since populate meta fields have changed");
                    this.metadataMetaClient = this.initializeMetaClient();
                }
            }
            catch (TableNotFoundException e) {
                return false;
            }
            Option latestMetadataInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
            reInitialize = this.isBootstrapNeeded((Option<HoodieInstant>)latestMetadataInstant);
        }
        if (reInitialize) {
            this.metrics.ifPresent(m -> m.incrementMetric("rebootstrap_count", 1L));
            LOG.info("Deleting Metadata Table directory so that it can be re-initialized");
            HoodieTableMetadataUtil.deleteMetadataTable((HoodieTableMetaClient)dataMetaClient, (HoodieEngineContext)this.engineContext, (boolean)false);
            exists = false;
        }
        return exists;
    }

    private boolean isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant) {
        if (!latestMetadataInstant.isPresent()) {
            LOG.warn("Metadata Table will need to be re-initialized as no instants were found");
            return true;
        }
        return false;
    }

    boolean shouldInitializeFromFilesystem(Set<String> pendingDataInstants, Option<String> inflightInstantTimestamp) {
        return true;
    }

    private boolean initializeFromFilesystem(String dataTableInstantTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException {
        Set<String> pendingDataInstants = this.getPendingDataInstants(this.dataMetaClient);
        if (!this.shouldInitializeFromFilesystem(pendingDataInstants, inflightInstantTimestamp)) {
            return false;
        }
        boolean filesPartitionAvailable = this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
        if (!filesPartitionAvailable) {
            partitionsToInit.remove(MetadataPartitionType.FILES);
            partitionsToInit.add(0, MetadataPartitionType.FILES);
            this.metadataMetaClient = this.initializeMetaClient();
        } else {
            this.initMetadataReader();
            if (this.metadataMetaClient == null) {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.storageConf.newInstance()).setBasePath(this.metadataWriteConfig.getBasePath()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
            }
        }
        partitionsToInit.removeIf(metadataPartition -> this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(metadataPartition));
        List<Object> partitionInfoList = filesPartitionAvailable ? this.listAllPartitionsFromMDT(dataTableInstantTime, pendingDataInstants) : (this.dataWriteConfig.getMetadataConfig().shouldAutoInitialize() ? this.listAllPartitionsFromFilesystem(dataTableInstantTime, pendingDataInstants) : Collections.emptyList());
        Map<String, Map<String, Long>> partitionIdToAllFilesMap = partitionInfoList.stream().map(p -> {
            String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition((String)p.getRelativePath());
            return Pair.of((Object)partitionName, (Object)p.getFileNameToSizeMap());
        }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
        while (iterator.hasNext()) {
            MetadataPartitionType partitionType = iterator.next();
            if (partitionType != MetadataPartitionType.PARTITION_STATS || this.dataMetaClient.getTableConfig().isTablePartitioned()) continue;
            iterator.remove();
            this.enabledPartitionTypes.remove(partitionType);
        }
        Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList = this.getLazyLatestMergedPartitionFileSliceList();
        block12: for (MetadataPartitionType partitionType : partitionsToInit) {
            String instantTimeForPartition = this.generateUniqueInstantTime(dataTableInstantTime);
            String partitionTypeName = partitionType.name();
            LOG.info("Initializing MDT partition {} at instant {}", (Object)partitionTypeName, (Object)instantTimeForPartition);
            Lazy tableSchema = Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable((HoodieTableMetaClient)this.dataMetaClient));
            try {
                switch (partitionType) {
                    case FILES: {
                        Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = this.initializeFilesPartition(partitionIdToAllFilesMap);
                        this.initializeFilegroupsAndCommit(partitionType, MetadataPartitionType.FILES.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition);
                        break;
                    }
                    case BLOOM_FILTERS: {
                        Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = this.initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap);
                        this.initializeFilegroupsAndCommit(partitionType, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition);
                        break;
                    }
                    case COLUMN_STATS: {
                        Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> colStatsColumnsAndRecord = this.initializeColumnStatsPartition(partitionIdToAllFilesMap, (Lazy<Option<Schema>>)tableSchema);
                        Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = (Pair<Integer, HoodieData<HoodieRecord>>)colStatsColumnsAndRecord.getValue();
                        this.initializeFilegroupsAndCommit(partitionType, MetadataPartitionType.COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition, (List)colStatsColumnsAndRecord.getKey());
                        break;
                    }
                    case RECORD_INDEX: {
                        boolean isPartitionedRLI = this.dataWriteConfig.isRecordLevelIndexEnabled();
                        this.initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, lazyLatestMergedPartitionFileSliceList, isPartitionedRLI);
                        break;
                    }
                    case EXPRESSION_INDEX: {
                        Set expressionIndexPartitionsToInit = HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit((MetadataPartitionType)partitionType, (HoodieMetadataConfig)this.dataWriteConfig.getMetadataConfig(), (HoodieTableMetaClient)this.dataMetaClient);
                        if (expressionIndexPartitionsToInit.size() != 1) {
                            if (expressionIndexPartitionsToInit.size() <= 1) continue block12;
                            LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", (Object)expressionIndexPartitionsToInit);
                            continue block12;
                        }
                        String relativePartitionPath = (String)expressionIndexPartitionsToInit.iterator().next();
                        Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = this.initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, lazyLatestMergedPartitionFileSliceList, (Lazy<Option<Schema>>)tableSchema);
                        this.initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
                        break;
                    }
                    case PARTITION_STATS: {
                        if (!this.dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
                            LOG.debug("Skipping partition stats initialization as column stats index is not enabled. Please enable {}", (Object)HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
                            continue block12;
                        }
                        Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = this.initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, (Lazy<Option<Schema>>)tableSchema);
                        this.initializeFilegroupsAndCommit(partitionType, MetadataPartitionType.PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition);
                        break;
                    }
                    case SECONDARY_INDEX: {
                        Set secondaryIndexPartitionsToInit = HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit((MetadataPartitionType)partitionType, (HoodieMetadataConfig)this.dataWriteConfig.getMetadataConfig(), (HoodieTableMetaClient)this.dataMetaClient);
                        if (secondaryIndexPartitionsToInit.size() != 1) {
                            if (secondaryIndexPartitionsToInit.size() <= 1) continue block12;
                            LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", (Object)secondaryIndexPartitionsToInit);
                            continue block12;
                        }
                        String relativePartitionPath = (String)secondaryIndexPartitionsToInit.iterator().next();
                        Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair = this.initializeSecondaryIndexPartition(relativePartitionPath, lazyLatestMergedPartitionFileSliceList);
                        this.initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
                        break;
                    }
                    default: {
                        throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType));
                    }
                }
            }
            catch (Exception e) {
                String metricKey = partitionType.getPartitionPath() + "_" + "bootstrap_error";
                this.metrics.ifPresent(m -> m.setMetric(metricKey, 1L));
                String errMsg = String.format("Bootstrap on %s partition failed for %s", partitionType.getPartitionPath(), this.metadataMetaClient.getBasePath());
                LOG.error(errMsg, (Throwable)e);
                throw new HoodieMetadataException(errMsg, e);
            }
        }
        return true;
    }

    protected abstract void updateColumnsToIndexWithColStats(List<String> var1);

    String generateUniqueInstantTime(String initializationTime) {
        HoodieTimeline dataIndexTimeline = this.dataMetaClient.getActiveTimeline().filter(instant -> instant.getAction().equals("indexing"));
        if (HoodieTableMetadataUtil.isIndexingCommit((HoodieTimeline)dataIndexTimeline, (String)initializationTime)) {
            return initializationTime;
        }
        int offset = 0;
        while (true) {
            String commitInstantTime = HoodieInstantTimeGenerator.instantTimePlusMillis((String)"00000000000000", (long)offset);
            if (!this.metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
                return commitInstantTime;
            }
            ++offset;
        }
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList, Lazy<Option<Schema>> tableSchemaOpt) {
        HoodieData records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords((HoodieEngineContext)this.engineContext, (List)((List)lazyLatestMergedPartitionFileSliceList.get()), (HoodieMetadataConfig)this.dataWriteConfig.getMetadataConfig(), (HoodieTableMetaClient)this.dataMetaClient, tableSchemaOpt, (Option)Option.of((Object)this.dataWriteConfig.getRecordMerger().getRecordType()));
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionIdToAllFilesMap, Lazy<Option<Schema>> tableSchema) {
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
        if (partitionIdToAllFilesMap.isEmpty()) {
            return Pair.of(Collections.emptyList(), (Object)Pair.of((Object)fileGroupCount, (Object)this.engineContext.emptyHoodieData()));
        }
        HoodieIndexVersion columnStatsIndexVersion = HoodieTableMetadataUtil.existingIndexVersionOrDefault((String)"column_stats", (HoodieTableMetaClient)this.dataMetaClient);
        ArrayList columnsToIndex = new ArrayList(HoodieTableMetadataUtil.getColumnsToIndex((HoodieTableConfig)this.dataMetaClient.getTableConfig(), (HoodieMetadataConfig)this.dataWriteConfig.getMetadataConfig(), tableSchema, (boolean)true, (Option)Option.of((Object)this.dataWriteConfig.getRecordMerger().getRecordType()), (HoodieIndexVersion)columnStatsIndexVersion).keySet());
        if (columnsToIndex.isEmpty()) {
            return Pair.of(columnsToIndex, (Object)Pair.of((Object)fileGroupCount, (Object)this.engineContext.emptyHoodieData()));
        }
        LOG.info("Indexing {} columns for column stats index", (Object)columnsToIndex.size());
        HoodieData records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords((HoodieEngineContext)this.engineContext, Collections.emptyMap(), partitionIdToAllFilesMap, (HoodieTableMetaClient)this.dataMetaClient, (HoodieMetadataConfig)this.dataWriteConfig.getMetadataConfig(), (int)this.dataWriteConfig.getColumnStatsIndexParallelism(), (int)this.dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize(), columnsToIndex);
        return Pair.of(columnsToIndex, (Object)Pair.of((Object)fileGroupCount, (Object)records));
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionIdToAllFilesMap) {
        HoodieData records = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords((HoodieEngineContext)this.engineContext, Collections.emptyMap(), partitionIdToAllFilesMap, (String)createInstantTime, (HoodieTableMetaClient)this.dataMetaClient, (int)this.dataWriteConfig.getBloomIndexParallelism(), (String)this.dataWriteConfig.getBloomFilterType());
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    protected abstract HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> var1, HoodieIndexDefinition var2, HoodieTableMetaClient var3, int var4, Schema var5, Schema var6, StorageConfiguration<?> var7, String var8);

    protected abstract EngineType getEngineType();

    private Pair<Integer, HoodieData<HoodieRecord>> initializeExpressionIndexPartition(String indexName, String dataTableInstantTime, Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList, Lazy<Option<Schema>> tableSchemaOpt) {
        HoodieIndexDefinition indexDefinition = this.getIndexDefinition(indexName);
        ValidationUtils.checkState((indexDefinition != null ? 1 : 0) != 0, (String)("Expression Index definition is not present for index " + indexName));
        List partitionFileSlicePairs = (List)lazyLatestMergedPartitionFileSliceList.get();
        ArrayList<Pair<String, Pair<String, Long>>> partitionFilePathSizeTriplet = new ArrayList<Pair<String, Pair<String, Long>>>();
        partitionFileSlicePairs.forEach(entry -> {
            if (((FileSlice)entry.getValue()).getBaseFile().isPresent()) {
                partitionFilePathSizeTriplet.add(Pair.of((Object)entry.getKey(), (Object)Pair.of((Object)((HoodieBaseFile)((FileSlice)entry.getValue()).getBaseFile().get()).getPath(), (Object)((HoodieBaseFile)((FileSlice)entry.getValue()).getBaseFile().get()).getFileLen())));
            }
            ((FileSlice)entry.getValue()).getLogFiles().forEach(hoodieLogFile -> partitionFilePathSizeTriplet.add(Pair.of((Object)entry.getKey(), (Object)Pair.of((Object)hoodieLogFile.getPath().toString(), (Object)hoodieLogFile.getFileSize()))));
        });
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
        if (partitionFileSlicePairs.isEmpty()) {
            return Pair.of((Object)fileGroupCount, (Object)this.engineContext.emptyHoodieData());
        }
        int parallelism = Math.min(partitionFilePathSizeTriplet.size(), this.dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
        Schema tableSchema = (Schema)((Option)tableSchemaOpt.get()).orElseThrow(() -> new HoodieMetadataException("Table schema is not available for expression index initialization"));
        Schema readerSchema = HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex((HoodieIndexDefinition)indexDefinition, (HoodieTableMetaClient)this.dataMetaClient, (Schema)tableSchema);
        return Pair.of((Object)fileGroupCount, this.getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, this.dataMetaClient, parallelism, tableSchema, readerSchema, this.storageConf, dataTableInstantTime));
    }

    HoodieIndexDefinition getIndexDefinition(String indexName) {
        return HoodieTableMetadataUtil.getHoodieIndexDefinition((String)indexName, (HoodieTableMetaClient)this.dataMetaClient);
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeSecondaryIndexPartition(String indexName, Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList) {
        HoodieIndexDefinition indexDefinition = this.getIndexDefinition(indexName);
        ValidationUtils.checkState((indexDefinition != null ? 1 : 0) != 0, (String)("Secondary Index definition is not present for index " + indexName));
        List partitionFileSlicePairs = (List)lazyLatestMergedPartitionFileSliceList.get();
        int parallelism = Math.min(partitionFileSlicePairs.size(), this.dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
        HoodieData<HoodieRecord> records = SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices(this.engineContext, partitionFileSlicePairs, parallelism, this.getClass().getSimpleName(), this.dataMetaClient, indexDefinition, this.dataWriteConfig.getProps());
        int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount((MetadataPartitionType)MetadataPartitionType.RECORD_INDEX, () -> records.count(), (int)48, (int)this.dataWriteConfig.getGlobalRecordLevelIndexMinFileGroupCount(), (int)this.dataWriteConfig.getGlobalRecordLevelIndexMaxFileGroupCount(), (float)this.dataWriteConfig.getRecordIndexGrowthFactor(), (int)this.dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
        return Pair.of((Object)fileGroupCount, records);
    }

    private Lazy<List<Pair<String, FileSlice>>> getLazyLatestMergedPartitionFileSliceList() {
        return Lazy.lazily(() -> {
            String latestInstant = (String)this.dataMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().map(HoodieInstant::requestedTime).orElse((Object)"00000000000000");
            try (HoodieTableFileSystemView fsView = this.getMetadataView();){
                List partitions = this.metadata.getAllPartitionPaths();
                fsView.loadAllPartitions();
                ArrayList partitionFileSlicePairs = new ArrayList();
                partitions.forEach(partition -> fsView.getLatestMergedFileSlicesBeforeOrOn(partition, latestInstant).forEach(fs -> partitionFileSlicePairs.add(Pair.of((Object)partition, (Object)fs))));
                ArrayList arrayList = partitionFileSlicePairs;
                return arrayList;
            }
            catch (IOException e) {
                throw new HoodieIOException("Cannot get the latest merged file slices", e);
            }
        });
    }

    void initializeFilegroupsAndCommit(MetadataPartitionType partitionType, String relativePartitionPath, Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair, String instantTimeForPartition) throws IOException {
        this.initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition, Collections.emptyList());
    }

    void initializeFilegroupsAndCommit(MetadataPartitionType partitionType, String relativePartitionPath, Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair, String instantTimeForPartition, List<String> columnsToIndex) throws IOException {
        String partitionTypeName = partitionType.name();
        LOG.info("Initializing {} index with {} mappings", (Object)partitionTypeName, fileGroupCountAndRecordsPair.getKey());
        HoodieTimer partitionInitTimer = HoodieTimer.start();
        int fileGroupCount = (Integer)fileGroupCountAndRecordsPair.getKey();
        ValidationUtils.checkArgument((fileGroupCount > 0 ? 1 : 0) != 0, (String)("FileGroup count for MDT partition " + partitionTypeName + " should be > 0"));
        this.clearExistingMetadataPartition(relativePartitionPath);
        this.initializeFileGroups(this.dataMetaClient, partitionType, instantTimeForPartition, fileGroupCount, relativePartitionPath, (Option<String>)Option.empty());
        HoodieData records = (HoodieData)fileGroupCountAndRecordsPair.getValue();
        this.bulkCommit(instantTimeForPartition, relativePartitionPath, (HoodieData<HoodieRecord>)records, new DefaultMetadataTableFileGroupIndexParser(fileGroupCount));
        if (partitionType == MetadataPartitionType.COLUMN_STATS) {
            this.updateColumnsToIndexWithColStats(columnsToIndex);
        }
        this.dataMetaClient.getTableConfig().setMetadataPartitionState(this.dataMetaClient, relativePartitionPath, true);
        this.initMetadataReader();
        long totalInitTime = partitionInitTimer.endTimer();
        LOG.info("Initializing {} index in metadata table took {} in ms", (Object)partitionTypeName, (Object)totalInitTime);
    }

    private void initializeFilegroupsAndCommitToRecordIndexPartition(String commitTimeForPartition, Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList, boolean isPartitionedRLI) throws IOException {
        HoodieData recordIndexRecords;
        HoodieTableMetadataUtil.createRecordIndexDefinition((HoodieTableMetaClient)this.dataMetaClient, Collections.singletonMap("isPartitioned", String.valueOf(isPartitionedRLI)));
        if (isPartitionedRLI) {
            recordIndexRecords = this.initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition, lazyLatestMergedPartitionFileSliceList);
        } else {
            Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecordIndexRecords = this.initializeRecordIndexPartition((List)lazyLatestMergedPartitionFileSliceList.get(), this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
            recordIndexRecords = (HoodieData)fgCountAndRecordIndexRecords.getRight();
            this.initializeFilegroupsAndCommit(MetadataPartitionType.RECORD_INDEX, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), fgCountAndRecordIndexRecords, commitTimeForPartition);
        }
        recordIndexRecords.unpersist();
    }

    private HoodieData<HoodieRecord> initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(String commitTimeForPartition, Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList) throws IOException {
        Map<String, List<Pair>> partitionFileSlicePairsMap = ((List)lazyLatestMergedPartitionFileSliceList.get()).stream().collect(Collectors.groupingBy(Pair::getKey));
        HashMap<String, Pair<Integer, HoodieData<HoodieRecord>>> fileGroupCountAndRecordsPairMap = new HashMap<String, Pair<Integer, HoodieData<HoodieRecord>>>(partitionFileSlicePairsMap.size());
        int maxParallelismPerHudiPartition = partitionFileSlicePairsMap.isEmpty() ? 1 : Math.max(1, this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism() / partitionFileSlicePairsMap.size());
        for (String partition : partitionFileSlicePairsMap.keySet()) {
            LOG.info("Initializing partitioned record index from data partition {}", (Object)partition);
            fileGroupCountAndRecordsPairMap.put(partition, this.initializeRecordIndexPartition(partitionFileSlicePairsMap.get(partition), maxParallelismPerHudiPartition));
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Initializing partitioned record index with {} mappings", (Object)fileGroupCountAndRecordsPairMap.values().stream().mapToInt(Pair::getLeft).sum());
        }
        HoodieTimer partitionInitTimer = HoodieTimer.start();
        HoodieData records = this.engineContext.emptyHoodieData();
        this.clearExistingMetadataPartition(MetadataPartitionType.RECORD_INDEX.getPartitionPath());
        TreeMap<String, Integer> partitionSizes = new TreeMap<String, Integer>();
        for (String dataPartition : fileGroupCountAndRecordsPairMap.keySet()) {
            Pair fileGroupCountAndRecordsPair = (Pair)fileGroupCountAndRecordsPairMap.get(dataPartition);
            ValidationUtils.checkArgument(((Integer)fileGroupCountAndRecordsPair.getKey() > 0 ? 1 : 0) != 0, (String)("FileGroup count for partitioned RLI data partition " + dataPartition + " should be > 0"));
            partitionSizes.put(dataPartition, (Integer)fileGroupCountAndRecordsPair.getKey());
            this.initializeFileGroups(this.dataMetaClient, MetadataPartitionType.RECORD_INDEX, commitTimeForPartition, (Integer)fileGroupCountAndRecordsPair.getKey(), MetadataPartitionType.RECORD_INDEX.getPartitionPath(), (Option<String>)Option.of((Object)dataPartition));
            records = records.union((HoodieData)fileGroupCountAndRecordsPair.getValue());
        }
        this.bulkCommit(commitTimeForPartition, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), (HoodieData<HoodieRecord>)records, new BucketizedMetadataTableFileGroupIndexParser(partitionSizes));
        this.dataMetaClient.getTableConfig().setMetadataPartitionState(this.dataMetaClient, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), true);
        this.initMetadataReader();
        long totalInitTime = partitionInitTimer.endTimer();
        LOG.info("Initializing partitioned record index in metadata table took {} in ms", (Object)totalInitTime);
        return records;
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition(List<Pair<String, FileSlice>> latestMergedPartitionFileSliceList, int recordIndexMaxParallelism) {
        LOG.info("Initializing record index from {} file slices", (Object)latestMergedPartitionFileSliceList.size());
        HoodieData<HoodieRecord> records = HoodieBackedTableMetadataWriter.readRecordKeysFromFileSliceSnapshot(this.engineContext, latestMergedPartitionFileSliceList, recordIndexMaxParallelism, this.getClass().getSimpleName(), this.dataMetaClient, this.dataWriteConfig);
        int fileGroupCount = this.estimateFileGroupCount(records);
        LOG.info("Initializing record index with {} file groups.", (Object)fileGroupCount);
        return Pair.of((Object)fileGroupCount, records);
    }

    private int estimateFileGroupCount(HoodieData<HoodieRecord> records) {
        int maxFileGroupCount;
        int minFileGroupCount;
        if (this.dataWriteConfig.isRecordLevelIndexEnabled()) {
            minFileGroupCount = this.dataWriteConfig.getRecordLevelIndexMinFileGroupCount();
            maxFileGroupCount = this.dataWriteConfig.getRecordLevelIndexMaxFileGroupCount();
        } else {
            minFileGroupCount = this.dataWriteConfig.getGlobalRecordLevelIndexMinFileGroupCount();
            maxFileGroupCount = this.dataWriteConfig.getGlobalRecordLevelIndexMaxFileGroupCount();
        }
        Supplier<Long> recordCountSupplier = () -> {
            records.persist("MEMORY_AND_DISK_SER");
            long count = records.count();
            LOG.info("Initializing record index with {} mappings", (Object)count);
            return count;
        };
        return HoodieTableMetadataUtil.estimateFileGroupCount((MetadataPartitionType)MetadataPartitionType.RECORD_INDEX, recordCountSupplier, (int)48, (int)minFileGroupCount, (int)maxFileGroupCount, (float)this.dataWriteConfig.getRecordIndexGrowthFactor(), (int)this.dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
    }

    private static <T> HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext, List<Pair<String, FileSlice>> partitionFileSlicePairs, int recordIndexMaxParallelism, String activeModule, HoodieTableMetaClient metaClient, HoodieWriteConfig dataWriteConfig) {
        if (partitionFileSlicePairs.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        Option instantTime = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime);
        if (!instantTime.isPresent()) {
            return engineContext.emptyHoodieData();
        }
        engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionFileSlicePairs.size() + " file slices");
        int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism);
        ReaderContextFactory readerContextFactory = engineContext.getReaderContextFactory(metaClient);
        return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap((SerializableFunction & Serializable)partitionAndFileSlice -> {
            String partition = (String)partitionAndFileSlice.getKey();
            FileSlice fileSlice = (FileSlice)partitionAndFileSlice.getValue();
            String fileId = fileSlice.getFileId();
            HoodieReaderContext readerContext = readerContextFactory.getContext();
            Schema dataSchema = AvroSchemaCache.intern((Schema)HoodieAvroUtils.addMetadataFields((Schema)new Schema.Parser().parse(dataWriteConfig.getWriteSchema()), (boolean)dataWriteConfig.allowOperationMetadataField()));
            Schema requestedSchema = metaClient.getTableConfig().populateMetaFields() ? HoodieAvroUtils.getRecordKeySchema() : HoodieAvroUtils.projectSchema((Schema)dataSchema, Arrays.asList((Object[])metaClient.getTableConfig().getRecordKeyFields().orElse((Object)new String[0])));
            Option internalSchemaOption = SerDeHelper.fromJson((String)dataWriteConfig.getInternalSchema());
            HoodieFileGroupReader fileGroupReader = HoodieFileGroupReader.newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withFileSlice(fileSlice).withLatestCommitTime((String)instantTime.get()).withDataSchema(dataSchema).withRequestedSchema(requestedSchema).withInternalSchema(internalSchemaOption).withShouldUseRecordPosition(false).withProps(metaClient.getTableConfig().getProps()).withEnableOptimizedLogBlockScan(dataWriteConfig.enableOptimizedLogBlocksScan()).build();
            String baseFileInstantTime = fileSlice.getBaseInstantTime();
            return new CloseableMappingIterator(fileGroupReader.getClosableIterator(), record -> {
                String recordKey = readerContext.getRecordContext().getRecordKey(record, requestedSchema);
                return HoodieMetadataPayload.createRecordIndexUpdate((String)recordKey, (String)partition, (String)fileId, (String)baseFileInstantTime, (int)0);
            });
        });
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeFilesPartition(Map<String, Map<String, Long>> partitionIdToAllFilesMap) {
        boolean fileGroupCount = true;
        Set<String> partitions = partitionIdToAllFilesMap.keySet();
        int totalDataFilesCount = partitionIdToAllFilesMap.values().stream().mapToInt(Map::size).sum();
        LOG.info("Committing total {} partitions and {} files to metadata", (Object)partitions.size(), (Object)totalDataFilesCount);
        HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(partitions);
        HoodieData allPartitionsRecord = this.engineContext.parallelize(Collections.singletonList(record), 1);
        if (partitionIdToAllFilesMap.isEmpty()) {
            return Pair.of((Object)1, (Object)allPartitionsRecord);
        }
        this.engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating records for metadata FILES partition");
        HoodieData fileListRecords = this.engineContext.parallelize(new ArrayList<Map.Entry<String, Map<String, Long>>>(partitionIdToAllFilesMap.entrySet()), partitionIdToAllFilesMap.size()).map((SerializableFunction & Serializable)partitionInfo -> {
            Map fileNameToSizeMap = (Map)partitionInfo.getValue();
            return HoodieMetadataPayload.createPartitionFilesRecord((String)((String)partitionInfo.getKey()), (Map)fileNameToSizeMap, Collections.emptyList());
        });
        ValidationUtils.checkState((fileListRecords.count() == (long)partitions.size() ? 1 : 0) != 0);
        return Pair.of((Object)1, (Object)allPartitionsRecord.union(fileListRecords));
    }

    private Set<String> getPendingDataInstants(HoodieTableMetaClient dataMetaClient) {
        return dataMetaClient.getActiveTimeline().getInstantsAsStream().filter(i -> !i.isCompleted()).filter(i -> !"indexing".equals(i.getAction())).map(HoodieInstant::requestedTime).collect(Collectors.toSet());
    }

    String getTimelineHistoryPath() {
        return (String)HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue();
    }

    private HoodieTableMetaClient initializeMetaClient() throws IOException {
        HoodieTableMetaClient.newTableBuilder().setTableType(HoodieTableType.MERGE_ON_READ).setTableName(this.dataWriteConfig.getTableName() + "_metadata").setTableVersion(this.dataWriteConfig.getWriteVersion()).setArchiveLogFolder(this.getTimelineHistoryPath()).setPayloadClassName(HoodieMetadataPayload.class.getName()).setBaseFileFormat(HoodieFileFormat.HFILE.toString()).setRecordKeyFields(RECORD_KEY_FIELD_NAME).setPopulateMetaFields(false).setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()).initTable(this.storageConf.newInstance(), this.metadataWriteConfig.getBasePath());
        return HoodieTableMetaClient.builder().setBasePath(this.metadataWriteConfig.getBasePath()).setConf(this.storageConf.newInstance()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
    }

    private List<HoodieTableMetadataUtil.DirectoryInfo> listAllPartitionsFromFilesystem(String initializationTime, Set<String> pendingDataInstants) {
        if (this.dataMetaClient.getActiveTimeline().countInstants() == 0) {
            return Collections.emptyList();
        }
        ArrayDeque<StoragePath> pathsToList = new ArrayDeque<StoragePath>();
        pathsToList.add(new StoragePath(this.dataWriteConfig.getBasePath()));
        LinkedList<HoodieTableMetadataUtil.DirectoryInfo> partitionsToBootstrap = new LinkedList<HoodieTableMetadataUtil.DirectoryInfo>();
        int fileListingParallelism = this.metadataWriteConfig.getFileListingParallelism();
        StorageConfiguration storageConf = this.dataMetaClient.getStorageConf();
        String dirFilterRegex = this.dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
        StoragePath storageBasePath = this.dataMetaClient.getBasePath();
        while (!pathsToList.isEmpty()) {
            int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
            ArrayList pathsToProcess = new ArrayList(numDirsToList);
            for (int i = 0; i < numDirsToList; ++i) {
                pathsToProcess.add(pathsToList.poll());
            }
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + numDirsToList + " partitions from filesystem");
            List processedDirectories = this.engineContext.map(pathsToProcess, (SerializableFunction & Serializable)path -> {
                HoodieHadoopStorage storage = new HoodieHadoopStorage(path, storageConf);
                String relativeDirPath = FSUtils.getRelativePartitionPath((StoragePath)storageBasePath, (StoragePath)path);
                return new HoodieTableMetadataUtil.DirectoryInfo(relativeDirPath, storage.listDirectEntries(path), initializationTime, pendingDataInstants);
            }, numDirsToList);
            for (HoodieTableMetadataUtil.DirectoryInfo dirInfo : processedDirectories) {
                String relativePath;
                if (!dirFilterRegex.isEmpty() && !(relativePath = dirInfo.getRelativePath()).isEmpty() && relativePath.matches(dirFilterRegex)) {
                    LOG.info("Ignoring directory {} which matches the filter regex {}", (Object)relativePath, (Object)dirFilterRegex);
                    continue;
                }
                if (dirInfo.isHoodiePartition()) {
                    partitionsToBootstrap.add(dirInfo);
                    continue;
                }
                pathsToList.addAll(dirInfo.getSubDirectories());
            }
        }
        return partitionsToBootstrap;
    }

    private List<HoodieTableMetadataUtil.DirectoryInfo> listAllPartitionsFromMDT(String initializationTime, Set<String> pendingDataInstants) throws IOException {
        List allAbsolutePartitionPaths = this.metadata.getAllPartitionPaths().stream().map(partitionPath -> this.dataWriteConfig.getBasePath() + '/' + partitionPath).collect(Collectors.toList());
        Map partitionFileMap = this.metadata.getAllFilesInPartitions(allAbsolutePartitionPaths);
        ArrayList<HoodieTableMetadataUtil.DirectoryInfo> dirinfoList = new ArrayList<HoodieTableMetadataUtil.DirectoryInfo>(partitionFileMap.size());
        for (Map.Entry entry : partitionFileMap.entrySet()) {
            String relativeDirPath = FSUtils.getRelativePartitionPath((StoragePath)new StoragePath(this.dataWriteConfig.getBasePath()), (StoragePath)new StoragePath((String)entry.getKey()));
            dirinfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(relativeDirPath, (List)entry.getValue(), initializationTime, pendingDataInstants, false));
        }
        return dirinfoList;
    }

    private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, int fileGroupCount, String relativePartitionPath, Option<String> dataPartitionName) throws IOException {
        String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s", fileGroupCount, relativePartitionPath, metadataPartition.getFileIdPrefix(), instantTime);
        LOG.info(msg);
        List fileGroupFileIds = IntStream.range(0, fileGroupCount).mapToObj(i -> HoodieTableMetadataUtil.getFileIDForFileGroup((MetadataPartitionType)metadataPartition, (int)i, (String)relativePartitionPath, (Option)dataPartitionName)).collect(Collectors.toList());
        ValidationUtils.checkArgument((fileGroupFileIds.size() == fileGroupCount ? 1 : 0) != 0);
        this.engineContext.setJobStatus(this.getClass().getSimpleName(), msg);
        this.engineContext.foreach(fileGroupFileIds, (SerializableConsumer & Serializable)fileGroupFileId -> {
            try {
                Map<HoodieLogBlock.HeaderMetadataType, String> blockHeader = Collections.singletonMap(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
                HoodieDeleteBlock block = new HoodieDeleteBlock(Collections.emptyList(), blockHeader);
                try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.constructAbsolutePath((String)this.metadataWriteConfig.getBasePath(), (String)relativePartitionPath)).withFileId(fileGroupFileId).withInstantTime(instantTime).withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withFileSize(0L).withSizeThreshold(this.metadataWriteConfig.getLogFileMaxSize()).withStorage(dataMetaClient.getStorage()).withLogWriteToken("0-0-0").withTableVersion(this.metadataWriteConfig.getWriteVersion()).withFileExtension(".log").build();){
                    writer.appendBlock((HoodieLogBlock)block);
                }
            }
            catch (InterruptedException e) {
                throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, relativePartitionPath), (Throwable)e);
            }
        }, fileGroupFileIds.size());
    }

    void clearExistingMetadataPartition(String relativePartitionPath) throws IOException {
        StoragePath partitionPath = new StoragePath(this.metadataWriteConfig.getBasePath(), relativePartitionPath);
        HoodieStorage storage = this.metadataMetaClient.getStorage();
        try {
            List existingFiles = storage.listDirectEntries(partitionPath);
            if (!existingFiles.isEmpty()) {
                LOG.info("Deleting all existing files found in MDT partition {}", (Object)relativePartitionPath);
                storage.deleteDirectory(partitionPath);
                ValidationUtils.checkState((!storage.exists(partitionPath) ? 1 : 0) != 0, (String)("Failed to delete MDT partition " + relativePartitionPath));
            }
        }
        catch (FileNotFoundException fileNotFoundException) {
            // empty catch block
        }
    }

    @Override
    public void dropMetadataPartitions(List<String> metadataPartitions) throws IOException {
        for (String partitionPath : metadataPartitions) {
            LOG.info("Deleting Metadata Table partition: {}", (Object)partitionPath);
            this.dataMetaClient.getStorage().deleteDirectory(new StoragePath(this.metadataWriteConfig.getBasePath(), partitionPath));
            LOG.info("Deleting pending indexing instant from the timeline for partition: {}", (Object)partitionPath);
            HoodieBackedTableMetadataWriter.deletePendingIndexingInstant(this.dataMetaClient, partitionPath);
        }
        this.closeInternal();
    }

    private static void deletePendingIndexingInstant(HoodieTableMetaClient metaClient, String partitionPath) {
        InstantGenerator instantGenerator = metaClient.getInstantGenerator();
        metaClient.reloadActiveTimeline().filterPendingIndexTimeline().getInstantsAsStream().filter(instant -> HoodieInstant.State.REQUESTED.equals((Object)instant.getState())).forEach(instant -> {
            try {
                HoodieIndexPlan indexPlan = metaClient.getActiveTimeline().readIndexPlan(instant);
                if (indexPlan.getIndexPartitionInfos().stream().anyMatch(indexPartitionInfo -> indexPartitionInfo.getMetadataPartitionPath().equals(partitionPath))) {
                    metaClient.getActiveTimeline().deleteInstantFileIfExists(instant);
                    metaClient.getActiveTimeline().deleteInstantFileIfExists(instantGenerator.getIndexInflightInstant(instant.requestedTime()));
                }
            }
            catch (IOException e) {
                LOG.error("Failed to delete the instant file corresponding to {}", instant);
            }
        });
    }

    void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) {
        Set<String> partitionsToUpdate = this.getMetadataPartitionsToUpdate();
        if (this.initialized && this.metadata != null) {
            Map<String, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata().entrySet().stream().filter(entry -> partitionsToUpdate.contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            this.commit(instantTime, partitionRecordsMap);
        }
    }

    private Set<String> getMetadataPartitionsToUpdate() {
        Set partitionsToUpdate = this.dataMetaClient.getTableConfig().getMetadataPartitions();
        partitionsToUpdate.addAll(HoodieTableMetadataUtil.getInflightMetadataPartitions((HoodieTableConfig)this.dataMetaClient.getTableConfig()));
        if (!partitionsToUpdate.isEmpty()) {
            return partitionsToUpdate;
        }
        LOG.debug("There are no partitions to update according to table config. Falling back to enabled partition types in the write config.");
        return this.getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
    }

    @Override
    public void buildMetadataPartitions(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos, String instantTime) throws IOException {
        if (indexPartitionInfos.isEmpty()) {
            LOG.debug("No partition to index in the plan");
            return;
        }
        String indexUptoInstantTime = indexPartitionInfos.get(0).getIndexUptoInstant();
        ArrayList partitionPaths = new ArrayList();
        ArrayList<MetadataPartitionType> partitionTypes = new ArrayList<MetadataPartitionType>();
        indexPartitionInfos.forEach(indexPartitionInfo -> {
            String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
            LOG.info("Creating a new metadata index for partition '{}' under path {} upto instant {}", new Object[]{relativePartitionPath, this.metadataWriteConfig.getBasePath(), indexUptoInstantTime});
            MetadataPartitionType partitionType = MetadataPartitionType.fromPartitionPath((String)relativePartitionPath);
            if (!this.enabledPartitionTypes.contains(partitionType)) {
                throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", partitionType));
            }
            partitionTypes.add(partitionType);
            partitionPaths.add(relativePartitionPath);
        });
        this.dataMetaClient.getTableConfig().setMetadataPartitionsInflight(this.dataMetaClient, partitionPaths);
        this.initializeFromFilesystem(instantTime, partitionTypes, (Option<String>)Option.empty());
    }

    @Override
    public void startCommit(String instantTime) {
        ValidationUtils.checkState((boolean)this.streamingWritesEnabled, (String)"Streaming writes should be enabled for startCommit API");
        if (!this.metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime)) {
            LOG.info("New commit at {} being applied to metadata table", (Object)instantTime);
        } else {
            LOG.error("Rolling back already inflight commit in Metadata table {} ", (Object)instantTime);
            this.getWriteClient().rollback(instantTime);
        }
        this.getWriteClient().startCommitForMetadataTable(this.metadataMetaClient, instantTime, "deltacommit");
    }

    @Override
    public HoodieData<WriteStatus> streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String instantTime) {
        Pair<List<MetadataPartitionType>, Set<String>> streamingMDTPartitionsPair = this.getStreamingMetadataPartitionsToUpdate();
        List mdtPartitionsToTag = (List)streamingMDTPartitionsPair.getLeft();
        Set mdtPartitionPathsToTag = (Set)streamingMDTPartitionsPair.getRight();
        if (mdtPartitionPathsToTag.isEmpty()) {
            return this.engineContext.emptyHoodieData();
        }
        this.maybeInitializeNewFileGroupsForPartitionedRLI(writeStatus, instantTime);
        Map<MetadataPartitionType, MetadataIndexMapper> indexMapperMap = mdtPartitionsToTag.stream().filter(e -> e.equals((Object)MetadataPartitionType.RECORD_INDEX) || e.equals((Object)MetadataPartitionType.SECONDARY_INDEX)).collect(Collectors.toMap(key -> key, key -> {
            if (MetadataPartitionType.RECORD_INDEX.equals(key)) {
                return new RecordIndexMapper(this.dataWriteConfig);
            }
            return new SecondaryIndexMapper(this.dataWriteConfig);
        }));
        if (indexMapperMap.isEmpty()) {
            return this.engineContext.emptyHoodieData();
        }
        HoodieData processedRecords = indexMapperMap.values().stream().map(indexMapper -> indexMapper.postProcess((HoodieData<HoodieRecord>)writeStatus.flatMap((SerializableFunction)indexMapper))).reduce(HoodieData::union).get();
        Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords = this.tagRecordsWithLocationForStreamingWrites((HoodieData<HoodieRecord>)processedRecords, mdtPartitionPathsToTag);
        HoodieData<WriteStatus> writeStatusCollection = this.convertEngineSpecificDataToHoodieData(this.streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords, instantTime));
        return writeStatusCollection;
    }

    private Pair<List<MetadataPartitionType>, Set<String>> getStreamingMetadataPartitionsToUpdate() {
        HashSet<MetadataPartitionType> mdtPartitionsToTag = new HashSet<MetadataPartitionType>();
        if (this.enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX)) {
            mdtPartitionsToTag.add(MetadataPartitionType.RECORD_INDEX);
        }
        HashSet mdtPartitionPathsToTag = new HashSet(mdtPartitionsToTag.stream().map(mdtPartitionToTag -> mdtPartitionToTag.getPartitionPath()).collect(Collectors.toSet()));
        List secondaryIndexPartitionPaths = this.dataMetaClient.getTableConfig().getMetadataPartitions().stream().filter(partition -> partition.startsWith("secondary_index_")).collect(Collectors.toList());
        if (!secondaryIndexPartitionPaths.isEmpty()) {
            mdtPartitionPathsToTag.addAll(secondaryIndexPartitionPaths);
            mdtPartitionsToTag.add(MetadataPartitionType.SECONDARY_INDEX);
        }
        return Pair.of(new ArrayList(mdtPartitionsToTag), mdtPartitionPathsToTag);
    }

    protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> fileGroupIdToTaggedRecords, String instantTime) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public O secondaryWriteToMetadataTablePartitions(I preppedRecords, String instantTime) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override
    public void completeStreamingCommit(String instantTime, HoodieEngineContext context, List<HoodieWriteStat> partialWriteStats, HoodieCommitMetadata metadata) {
        ArrayList<HoodieWriteStat> allWriteStats = new ArrayList<HoodieWriteStat>(partialWriteStats);
        allWriteStats.addAll(this.prepareAndWriteToNonStreamingPartitions(metadata, instantTime).map(WriteStatus::getStat).collectAsList());
        this.getWriteClient().commitStats(instantTime, allWriteStats, (Option<Map<String, String>>)Option.empty(), "deltacommit", Collections.emptyMap(), (Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>>)Option.empty());
    }

    private HoodieData<WriteStatus> prepareAndWriteToNonStreamingPartitions(HoodieCommitMetadata commitMetadata, String instantTime) {
        Set<String> partitionsToUpdate = this.getNonStreamingMetadataPartitionsToUpdate();
        Map<String, HoodieData<HoodieRecord>> mdtPartitionsAndUnTaggedRecords = new BatchMetadataConversionFunction(instantTime, commitMetadata, partitionsToUpdate).convertMetadata();
        Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> taggedRecords = this.tagRecordsWithLocation(mdtPartitionsAndUnTaggedRecords, false);
        I preppedRecords = this.convertHoodieDataToEngineSpecificData((HoodieData<HoodieRecord>)((HoodieData)taggedRecords.getKey()));
        return this.convertEngineSpecificDataToHoodieData(this.secondaryWriteToMetadataTablePartitions(preppedRecords, instantTime));
    }

    private Set<String> getNonStreamingMetadataPartitionsToUpdate() {
        HashSet<String> toReturn = new HashSet<String>();
        HashSet streamingMDTPartitions = new HashSet((Collection)this.getStreamingMetadataPartitionsToUpdate().getLeft());
        for (MetadataPartitionType partitionType : this.enabledPartitionTypes) {
            if (streamingMDTPartitions.contains(partitionType)) continue;
            toReturn.add(partitionType.getPartitionPath());
        }
        return toReturn;
    }

    protected Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> tagRecordsWithLocationForStreamingWrites(HoodieData<HoodieRecord> untaggedRecords, Set<String> enabledMetadataPartitions) {
        ArrayList updatedFileGroupIds = new ArrayList();
        HashMap partitionToLatestFileSlices = new HashMap();
        try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable((HoodieTableMetaClient)this.metadataMetaClient);){
            enabledMetadataPartitions.forEach(partitionName -> {
                List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices((HoodieTableMetaClient)this.metadataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)partitionName);
                if (fileSlices.isEmpty()) {
                    ValidationUtils.checkState((boolean)this.dataMetaClient.getTableConfig().getMetadataPartitionsInflight().contains(partitionName), (String)String.format("Partition %s should be part of inflight metadata partitions here %s", partitionName, this.dataMetaClient.getTableConfig().getMetadataPartitionsInflight()));
                    fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight((HoodieTableMetaClient)this.metadataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)partitionName);
                }
                partitionToLatestFileSlices.put(partitionName, fileSlices);
                int fileGroupCount = fileSlices.size();
                fileSlices.forEach(fileSlice -> updatedFileGroupIds.add(fileSlice.getFileGroupId()));
                ValidationUtils.checkArgument((fileGroupCount > 0 ? 1 : 0) != 0, (String)String.format("FileGroup count for MDT partition %s should be > 0", partitionName));
            });
        }
        HashMap indexToTagFunctions = new HashMap();
        partitionToLatestFileSlices.keySet().forEach(mdtPartition -> indexToTagFunctions.put(mdtPartition, this.getRecordTagger((String)mdtPartition, (List)partitionToLatestFileSlices.get(mdtPartition))));
        HoodieData taggedRecords = untaggedRecords.map((SerializableFunction & Serializable)mdtRecord -> (HoodieRecord)((SerializableFunction)indexToTagFunctions.get(mdtRecord.getPartitionPath())).apply(mdtRecord));
        return Pair.of(updatedFileGroupIds, (Object)taggedRecords);
    }

    @Override
    public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
        this.mayBeReinitMetadataReader();
        this.maybeInitializeNewFileGroupsForPartitionedRLI(commitMetadata, instantTime);
        this.processAndCommit(instantTime, new BatchMetadataConversionFunction(instantTime, commitMetadata, this.getMetadataPartitionsToUpdate()));
        this.closeInternal();
    }

    private void maybeInitializeNewFileGroupsForPartitionedRLI(HoodieCommitMetadata commitMetadata, String instantTime) {
        if (this.dataWriteConfig.isRecordLevelIndexEnabled()) {
            HashSet<String> partitionsTouchedByInflightCommit = new HashSet<String>(commitMetadata.getPartitionToWriteStats().keySet());
            this.initializeNewFileGroupsForPartitionedRLIHelper(partitionsTouchedByInflightCommit, instantTime);
        }
    }

    private void maybeInitializeNewFileGroupsForPartitionedRLI(HoodieData<WriteStatus> writeStatus, String instantTime) {
        if (this.dataWriteConfig.isRecordLevelIndexEnabled()) {
            HashSet<String> partitionsTouchedByInflightCommit = new HashSet<String>(writeStatus.map(WriteStatus::getPartitionPath).collectAsList());
            this.initializeNewFileGroupsForPartitionedRLIHelper(partitionsTouchedByInflightCommit, instantTime);
        }
    }

    private void initializeNewFileGroupsForPartitionedRLIHelper(Set<String> partitionsTouchedByInflightCommit, String instantTime) {
        try {
            HashSet partitionsFromFilesIndex = new HashSet(this.metadata.getAllPartitionPaths());
            partitionsTouchedByInflightCommit.removeAll(partitionsFromFilesIndex);
            if (!partitionsTouchedByInflightCommit.isEmpty()) {
                for (String partitionToWrite : partitionsTouchedByInflightCommit) {
                    this.initializeFileGroups(this.dataMetaClient, MetadataPartitionType.RECORD_INDEX, instantTime, this.dataWriteConfig.getRecordLevelIndexMinFileGroupCount(), MetadataPartitionType.RECORD_INDEX.getPartitionPath(), (Option<String>)Option.of((Object)partitionToWrite));
                }
                this.initMetadataReader();
            }
        }
        catch (IOException e) {
            LOG.error("Failed to initialize newly added partitions for the partitioned record index", (Throwable)e);
            throw new HoodieMetadataException("Failed to initialize newly added partitions for the partitioned record index", (Exception)e);
        }
    }

    private void updateExpressionIndexIfPresent(HoodieCommitMetadata commitMetadata, String instantTime, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap) {
        if (!MetadataPartitionType.EXPRESSION_INDEX.isMetadataPartitionAvailable(this.dataMetaClient)) {
            return;
        }
        this.dataMetaClient.getTableConfig().getMetadataPartitions().stream().filter(partition -> partition.startsWith("expr_index_")).forEach(partition -> {
            HoodieData<HoodieRecord> expressionIndexRecords;
            try {
                expressionIndexRecords = this.getExpressionIndexUpdates(commitMetadata, (String)partition, instantTime);
            }
            catch (Exception e) {
                throw new HoodieMetadataException(String.format("Failed to get expression index updates for partition %s", partition), e);
            }
            partitionToRecordMap.put((String)partition, expressionIndexRecords);
        });
    }

    protected HoodieData<HoodieRecord> getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception {
        throw new UnsupportedOperationException("Expression Index only supported with SPARK engine.");
    }

    private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap, String instantTime) {
        boolean secondaryIndexMetadataPartitionAvailable = MetadataPartitionType.SECONDARY_INDEX.isMetadataPartitionAvailable(this.dataMetaClient);
        if (!secondaryIndexMetadataPartitionAvailable) {
            return;
        }
        WriteOperationType operationType = commitMetadata.getOperationType();
        if (operationType.isInsertOverwriteOrDeletePartition()) {
            throw new HoodieIndexException(String.format("Can not perform operation %s on secondary index", operationType));
        }
        if (operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.CLUSTER) {
            return;
        }
        this.dataMetaClient.getTableConfig().getMetadataPartitions().stream().filter(partition -> partition.startsWith("secondary_index_")).forEach(partition -> {
            HoodieData<HoodieRecord> secondaryIndexRecords;
            try {
                secondaryIndexRecords = this.getSecondaryIndexUpdates(commitMetadata, (String)partition, instantTime);
            }
            catch (Exception e) {
                throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e);
            }
            partitionToRecordMap.put((String)partition, secondaryIndexRecords);
        });
    }

    private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) {
        List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
        if (allWriteStats.isEmpty() || WriteOperationType.isCompactionOrClustering((WriteOperationType)commitMetadata.getOperationType())) {
            return this.engineContext.emptyHoodieData();
        }
        HoodieIndexDefinition indexDefinition = this.getIndexDefinition(indexPartition);
        return SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords(allWriteStats, instantTime, indexDefinition, this.dataWriteConfig.getMetadataConfig(), this.dataMetaClient, this.engineContext, this.dataWriteConfig);
    }

    @Override
    public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
        this.mayBeReinitMetadataReader();
        this.processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords((HoodieEngineContext)this.engineContext, (HoodieCleanMetadata)cleanMetadata, (String)instantTime, (HoodieTableMetaClient)this.dataMetaClient, (HoodieMetadataConfig)this.dataWriteConfig.getMetadataConfig(), this.enabledPartitionTypes, (int)this.dataWriteConfig.getBloomIndexParallelism(), (Option)Option.of((Object)this.dataWriteConfig.getRecordMerger().getRecordType())));
        this.closeInternal();
    }

    @Override
    public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
        this.mayBeReinitMetadataReader();
        this.dataMetaClient.reloadActiveTimeline();
        InstantGenerator datainstantGenerator = this.dataMetaClient.getInstantGenerator();
        HoodieInstant restoreInstant = datainstantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, "restore", instantTime);
        HoodieInstant requested = datainstantGenerator.getRestoreRequestedInstant(restoreInstant);
        HoodieRestorePlan restorePlan = null;
        try {
            restorePlan = this.dataMetaClient.getActiveTimeline().readRestorePlan(requested);
        }
        catch (IOException e) {
            throw new HoodieIOException(String.format("Deserialization of restore plan failed whose restore instant time is %s in data table", instantTime), e);
        }
        String restoreToInstantTime = restorePlan.getSavepointToRestoreTimestamp();
        LOG.info("Triggering restore to {} in metadata table", (Object)restoreToInstantTime);
        List filesGroups = this.metadata.getMetadataFileSystemView().getAllFileGroups(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
        boolean cannotRestore = filesGroups.stream().map(fileGroup -> fileGroup.getAllFileSlices().map(FileSlice::getBaseInstantTime).anyMatch(instantTime1 -> InstantComparison.compareTimestamps((String)instantTime1, (BiPredicate)InstantComparison.LESSER_THAN_OR_EQUALS, (String)restoreToInstantTime))).anyMatch(canRestore -> canRestore == false);
        if (cannotRestore) {
            throw new HoodieMetadataException(String.format("Can't restore to %s since there is no base file in MDT lesser than the commit to restore to. Please delete metadata table and retry", restoreToInstantTime));
        }
        List<HoodieTableMetadataUtil.DirectoryInfo> dirInfoList = this.listAllPartitionsFromFilesystem(instantTime, Collections.emptySet());
        Map<String, HoodieTableMetadataUtil.DirectoryInfo> dirInfoMap = dirInfoList.stream().collect(Collectors.toMap(HoodieTableMetadataUtil.DirectoryInfo::getRelativePath, Function.identity()));
        dirInfoList.clear();
        BaseHoodieWriteClient<?, I, ?, O> writeClient = this.getWriteClient();
        writeClient.restoreToInstant(restoreToInstantTime, false);
        try {
            this.initMetadataReader();
            HashMap<String, Map<String, Long>> partitionFilesToAdd = new HashMap<String, Map<String, Long>>();
            HashMap<String, List<String>> partitionFilesToDelete = new HashMap<String, List<String>>();
            ArrayList<String> partitionsToDelete = new ArrayList<String>();
            this.fetchOutofSyncFilesRecordsFromMetadataTable(dirInfoMap, partitionFilesToAdd, partitionFilesToDelete, partitionsToDelete);
            String syncCommitTime = this.createRestoreInstantTime();
            this.processAndCommit(syncCommitTime, () -> {
                HashMap<String, HoodieData<HoodieRecord>> partitionRecords = new HashMap<String, HoodieData<HoodieRecord>>();
                partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords((HoodieEngineContext)this.engineContext, (List)partitionsToDelete, (Map)partitionFilesToAdd, (Map)partitionFilesToDelete, (String)syncCommitTime));
                if (this.dataMetaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
                    partitionRecords.putAll(HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(partitionFilesToAdd, partitionFilesToDelete, this.engineContext, this.dataMetaClient, this.dataWriteConfig.getMetadataConfig(), (Option<HoodieRecord.HoodieRecordType>)Option.of((Object)this.dataWriteConfig.getRecordMerger().getRecordType()), this.dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
                }
                return partitionRecords;
            });
            this.closeInternal();
        }
        catch (IOException e) {
            throw new HoodieMetadataException("IOException during MDT restore sync", (Exception)e);
        }
    }

    static Map<String, HoodieData<HoodieRecord>> convertToColumnStatsRecord(Map<String, Map<String, Long>> partitionFilesToAdd, Map<String, List<String>> partitionFilesToDelete, HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, Option<HoodieRecord.HoodieRecordType> recordTypeOpt, int columnStatsIndexParallelism) {
        if (partitionFilesToDelete.isEmpty() && partitionFilesToAdd.isEmpty()) {
            return Collections.emptyMap();
        }
        Lazy tableSchema = Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable((HoodieTableMetaClient)dataMetaClient));
        ArrayList columnsToIndex = new ArrayList(HoodieTableMetadataUtil.getColumnsToIndex((HoodieTableConfig)dataMetaClient.getTableConfig(), (HoodieMetadataConfig)metadataConfig, (Lazy)tableSchema, (boolean)false, recordTypeOpt, (HoodieIndexVersion)HoodieTableMetadataUtil.existingIndexVersionOrDefault((String)"column_stats", (HoodieTableMetaClient)dataMetaClient)).keySet());
        if (columnsToIndex.isEmpty()) {
            LOG.info("Since there are no columns to index, stop to generate ColumnStats records.");
            return Collections.emptyMap();
        }
        HoodieData records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords((HoodieEngineContext)engineContext, partitionFilesToDelete, partitionFilesToAdd, (HoodieTableMetaClient)dataMetaClient, (HoodieMetadataConfig)metadataConfig, (int)columnStatsIndexParallelism, (int)metadataConfig.getMaxReaderBufferSize(), columnsToIndex);
        return Collections.singletonMap(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), records);
    }

    String createRestoreInstantTime() {
        return this.writeClient.createNewInstantTime(false);
    }

    @Override
    public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
        if (this.initialized && this.metadata != null) {
            this.mayBeReinitMetadataReader();
            String commitToRollbackInstantTime = (String)rollbackMetadata.getCommitsRollback().get(0);
            Option deltaCommitInstantOpt = this.metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filter(s -> s.requestedTime().equals(commitToRollbackInstantTime)).firstInstant();
            if (deltaCommitInstantOpt.isPresent()) {
                HoodieInstant deltaCommitInstant = (HoodieInstant)deltaCommitInstantOpt.get();
                if (deltaCommitInstant.isCompleted()) {
                    this.validateRollback(deltaCommitInstant);
                }
                LOG.info("Rolling back MDT deltacommit {}", (Object)commitToRollbackInstantTime);
                if (!this.getWriteClient().rollback(commitToRollbackInstantTime, instantTime)) {
                    throw new HoodieMetadataException(String.format("Failed to rollback deltacommit at %s", commitToRollbackInstantTime));
                }
            } else {
                LOG.info("Ignoring rollback of instant {} at {}. The commit to rollback is not found in MDT", (Object)commitToRollbackInstantTime, (Object)instantTime);
            }
            this.closeInternal();
        }
    }

    private void validateRollback(HoodieInstant commitToRollbackInstant) {
        Option deltaCommitsInfo = CompactionUtils.getDeltaCommitsSinceLatestCompaction((HoodieActiveTimeline)this.metadataMetaClient.getActiveTimeline());
        HoodieInstant compactionInstant = (HoodieInstant)((Pair)deltaCommitsInfo.get()).getValue();
        HoodieTimeline deltacommitsSinceCompaction = (HoodieTimeline)((Pair)deltaCommitsInfo.get()).getKey();
        if (compactionInstant.getAction().equals("commit")) {
            String compactionInstantTime = compactionInstant.requestedTime();
            String commitToRollbackInstantTime = commitToRollbackInstant.requestedTime();
            if (commitToRollbackInstantTime.length() == compactionInstantTime.length() && InstantComparison.compareTimestamps((String)commitToRollbackInstant.getCompletionTime(), (BiPredicate)InstantComparison.LESSER_THAN_OR_EQUALS, (String)compactionInstantTime)) {
                throw new HoodieMetadataException(String.format("Commit being rolled back %s is earlier than the latest compaction %s. There are %d deltacommits after this compaction: %s", commitToRollbackInstantTime, compactionInstantTime, deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants()));
            }
        }
    }

    @Override
    public void close() throws Exception {
        if (this.metadata != null) {
            this.metadata.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
        if (this.metadataView != null) {
            this.metadataView.close();
            this.metadataView = null;
        }
    }

    protected abstract void commit(String var1, Map<String, HoodieData<HoodieRecord>> var2);

    protected abstract I convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> var1);

    protected abstract HoodieData<WriteStatus> convertEngineSpecificDataToHoodieData(O var1);

    protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        ValidationUtils.checkState((this.metadataMetaClient != null ? 1 : 0) != 0, (String)"Metadata table is not fully initialized yet.");
        Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> result = this.tagRecordsWithLocation(partitionRecordsMap, isInitializing);
        HoodieData preppedRecords = (HoodieData)result.getKey();
        I preppedRecordInputs = this.convertHoodieDataToEngineSpecificData((HoodieData<HoodieRecord>)preppedRecords);
        BaseHoodieWriteClient<?, I, ?, O> writeClient = this.getWriteClient();
        this.metadataMetaClient = HoodieBackedTableMetadataWriter.rollbackFailedWrites(this.dataWriteConfig, writeClient, this.metadataMetaClient);
        if (!this.metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime)) {
            LOG.info("New commit at {} being applied to MDT.", (Object)instantTime);
        } else {
            Option alreadyCompletedInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.requestedTime().equals(instantTime)).lastInstant();
            LOG.info("{} completed commit at {} being applied to MDT.", (Object)(alreadyCompletedInstant.isPresent() ? "Already" : "Partially"), (Object)instantTime);
            if (!writeClient.rollback(instantTime)) {
                throw new HoodieMetadataException(String.format("Failed to rollback deltacommit at %s from MDT", instantTime));
            }
            this.metadataMetaClient.reloadActiveTimeline();
        }
        writeClient.startCommitForMetadataTable(this.metadataMetaClient, instantTime, "deltacommit");
        this.preWrite(instantTime);
        if (isInitializing) {
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Bulk inserting at %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
            this.bulkInsertAndCommit(writeClient, instantTime, preppedRecordInputs, bulkInsertPartitioner);
        } else {
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Upserting at %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
            this.upsertAndCommit(writeClient, instantTime, preppedRecordInputs);
        }
        this.metadataMetaClient.reloadActiveTimeline();
        this.metrics.ifPresent(m -> m.updateSizeMetrics(this.metadataMetaClient, this.metadata, this.dataMetaClient.getTableConfig().getMetadataPartitions()));
    }

    protected abstract void bulkInsertAndCommit(BaseHoodieWriteClient<?, I, ?, O> var1, String var2, I var3, Option<BulkInsertPartitioner> var4);

    protected abstract void upsertAndCommit(BaseHoodieWriteClient<?, I, ?, O> var1, String var2, I var3);

    protected abstract void upsertAndCommit(BaseHoodieWriteClient<?, I, ?, O> var1, String var2, I var3, List<HoodieFileGroupId> var4);

    static <I> HoodieTableMetaClient rollbackFailedWrites(HoodieWriteConfig dataWriteConfig, BaseHoodieWriteClient<?, I, ?, ?> writeClient, HoodieTableMetaClient metadataMetaClient) {
        if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && writeClient.rollbackFailedWrites(metadataMetaClient)) {
            metadataMetaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metadataMetaClient);
        }
        return metadataMetaClient;
    }

    protected void preWrite(String instantTime) {
    }

    protected abstract void bulkCommit(String var1, String var2, HoodieData<HoodieRecord> var3, MetadataTableFileGroupIndexParser var4);

    protected Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> tagRecordsWithLocation(Map<String, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing) {
        HoodieData allPartitionRecords = this.engineContext.emptyHoodieData();
        try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable((HoodieTableMetaClient)this.metadataMetaClient);){
            ArrayList hoodieFileGroupIdList = new ArrayList();
            for (Map.Entry<String, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
                String partitionPath = entry.getKey();
                HoodieData<HoodieRecord> records = entry.getValue();
                boolean isNonGlobalRLI = Objects.equals(partitionPath, MetadataPartitionType.RECORD_INDEX.getPartitionPath()) && this.dataWriteConfig.isRecordLevelIndexEnabled();
                List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices((HoodieTableMetaClient)this.metadataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)partitionPath);
                if (isNonGlobalRLI) {
                    fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight((HoodieTableMetaClient)this.metadataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)partitionPath);
                } else if (fileSlices.isEmpty()) {
                    ValidationUtils.checkState((isInitializing || this.dataMetaClient.getTableConfig().getMetadataPartitionsInflight().contains(partitionPath) ? 1 : 0) != 0, (String)String.format("Partition %s should be part of inflight metadata partitions here %s", partitionPath, this.dataMetaClient.getTableConfig().getMetadataPartitionsInflight()));
                    fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight((HoodieTableMetaClient)this.metadataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)partitionPath);
                }
                hoodieFileGroupIdList.addAll(fileSlices.stream().map(fileSlice -> new HoodieFileGroupId(partitionPath, fileSlice.getFileId())).collect(Collectors.toList()));
                SerializableFunction<HoodieRecord, HoodieRecord> recordTagger = this.getRecordTagger(partitionPath, fileSlices);
                allPartitionRecords = allPartitionRecords.union(records.map(recordTagger));
            }
            Pair pair = Pair.of((Object)allPartitionRecords, hoodieFileGroupIdList);
            return pair;
        }
    }

    private SerializableFunction<HoodieRecord, HoodieRecord> getRecordTagger(String partitionPath, List<FileSlice> fileSlices) {
        HoodieIndexVersion indexVersion = HoodieTableMetadataUtil.existingIndexVersionOrDefault((String)partitionPath, (HoodieTableMetaClient)this.dataMetaClient);
        MetadataPartitionType partitionType = MetadataPartitionType.fromPartitionPath((String)partitionPath);
        SerializableBiFunction mappingFunction = partitionType.getFileGroupMappingFunction(indexVersion);
        if (partitionType == MetadataPartitionType.RECORD_INDEX && this.dataWriteConfig.isRecordLevelIndexEnabled()) {
            return this.getRecordTaggerPartitionedRLI(fileSlices, (SerializableBiFunction<String, Integer, Integer>)mappingFunction);
        }
        int fileGroupCount = fileSlices.size();
        ValidationUtils.checkArgument((fileGroupCount > 0 ? 1 : 0) != 0, (String)String.format("FileGroup count for MDT partition %s should be > 0", partitionPath));
        return (SerializableFunction & Serializable)r -> {
            FileSlice slice = (FileSlice)fileSlices.get((Integer)mappingFunction.apply((Object)r.getRecordKey(), (Object)fileGroupCount));
            r.unseal();
            r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
            r.seal();
            return r;
        };
    }

    private SerializableFunction<HoodieRecord, HoodieRecord> getRecordTaggerPartitionedRLI(List<FileSlice> fileSlices, SerializableBiFunction<String, Integer, Integer> mappingFunction) {
        HashMap fileSlicesPerHudiPartition = new HashMap();
        fileSlices.forEach(s -> fileSlicesPerHudiPartition.computeIfAbsent(HoodieTableMetadataUtil.getDataTablePartitionNameFromFileGroupName((String)s.getFileId()), x -> new ArrayList()).add(new HoodieRecordLocation(s.getBaseInstantTime(), s.getFileId())));
        return (SerializableFunction & Serializable)r -> {
            String partitionPath = r.getData() instanceof EmptyHoodieRecordPayloadWithPartition ? ((EmptyHoodieRecordPayloadWithPartition)r.getData()).getPartitionPath() : ((HoodieMetadataPayload)r.getData()).getDataPartition();
            List recordLocationList = (List)fileSlicesPerHudiPartition.get(partitionPath);
            HoodieRecordLocation recordLocation = (HoodieRecordLocation)recordLocationList.get((Integer)mappingFunction.apply((Object)r.getRecordKey(), (Object)recordLocationList.size()));
            r.unseal();
            r.setCurrentLocation(recordLocation);
            r.seal();
            return r;
        };
    }

    @Override
    public void performTableServices(Option<String> inFlightInstantTimestamp, boolean requiresTimelineRefresh) {
        String metadataTableName;
        Option lastInstant;
        BaseHoodieWriteClient<?, I, ?, O> writeClient;
        boolean allTableServicesExecutedSuccessfullyOrSkipped;
        HoodieTimer metadataTableServicesTimer;
        block12: {
            metadataTableServicesTimer = HoodieTimer.start();
            allTableServicesExecutedSuccessfullyOrSkipped = true;
            writeClient = this.getWriteClient();
            HoodieActiveTimeline activeTimeline = HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(this.metadataMetaClient, writeClient, requiresTimelineRefresh);
            lastInstant = activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
            if (lastInstant.isPresent()) break block12;
            String metadataTableName2 = writeClient.getConfig().getTableName();
            boolean tableNameExists = StringUtils.nonEmpty((String)metadataTableName2);
            String executionDurationMetricName = tableNameExists ? String.format("%s.%s", metadataTableName2, "table_service_execution_duration") : "table_service_execution_duration";
            String executionStatusMetricName = tableNameExists ? String.format("%s.%s", metadataTableName2, "table_service_execution_status") : "table_service_execution_status";
            long timeSpent = metadataTableServicesTimer.endTimer();
            this.metrics.ifPresent(m -> m.setMetric(executionDurationMetricName, timeSpent));
            if (allTableServicesExecutedSuccessfullyOrSkipped) {
                this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, 1L));
            } else {
                this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, -1L));
            }
            return;
        }
        try {
            this.cleanIfNecessary(writeClient, ((HoodieInstant)lastInstant.get()).requestedTime());
            if (this.validateCompactionScheduling(inFlightInstantTimestamp, ((HoodieInstant)lastInstant.get()).requestedTime())) {
                String latestDeltacommitTime = ((HoodieInstant)lastInstant.get()).requestedTime();
                LOG.info("Latest deltacommit time found is {}, running compaction operations.", (Object)latestDeltacommitTime);
                this.compactIfNecessary(writeClient, (Option<String>)Option.of((Object)latestDeltacommitTime));
            }
            writeClient.archive();
            LOG.info("All the table services operations on MDT completed successfully");
            metadataTableName = writeClient.getConfig().getTableName();
        }
        catch (Exception e) {
            try {
                LOG.error("Exception in running table services on metadata table", (Throwable)e);
                allTableServicesExecutedSuccessfullyOrSkipped = false;
                throw e;
            }
            catch (Throwable throwable) {
                String metadataTableName3 = writeClient.getConfig().getTableName();
                boolean tableNameExists = StringUtils.nonEmpty((String)metadataTableName3);
                String executionDurationMetricName = tableNameExists ? String.format("%s.%s", metadataTableName3, "table_service_execution_duration") : "table_service_execution_duration";
                String executionStatusMetricName = tableNameExists ? String.format("%s.%s", metadataTableName3, "table_service_execution_status") : "table_service_execution_status";
                long timeSpent = metadataTableServicesTimer.endTimer();
                this.metrics.ifPresent(m -> m.setMetric(executionDurationMetricName, timeSpent));
                if (allTableServicesExecutedSuccessfullyOrSkipped) {
                    this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, 1L));
                } else {
                    this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, -1L));
                }
                throw throwable;
            }
        }
        boolean tableNameExists = StringUtils.nonEmpty((String)metadataTableName);
        String executionDurationMetricName = tableNameExists ? String.format("%s.%s", metadataTableName, "table_service_execution_duration") : "table_service_execution_duration";
        String executionStatusMetricName = tableNameExists ? String.format("%s.%s", metadataTableName, "table_service_execution_status") : "table_service_execution_status";
        long timeSpent = metadataTableServicesTimer.endTimer();
        this.metrics.ifPresent(m -> m.setMetric(executionDurationMetricName, timeSpent));
        if (allTableServicesExecutedSuccessfullyOrSkipped) {
            this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, 1L));
        } else {
            this.metrics.ifPresent(m -> m.setMetric(executionStatusMetricName, -1L));
        }
    }

    static HoodieActiveTimeline runPendingTableServicesOperationsAndRefreshTimeline(HoodieTableMetaClient metadataMetaClient, BaseHoodieWriteClient<?, ?, ?, ?> writeClient, boolean initialTimelineRequiresRefresh) {
        HoodieActiveTimeline activeTimeline = initialTimelineRequiresRefresh ? metadataMetaClient.reloadActiveTimeline() : metadataMetaClient.getActiveTimeline();
        boolean ranServices = false;
        if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 0) {
            writeClient.runAnyPendingCompactions();
            ranServices = true;
        }
        if (activeTimeline.filterPendingLogCompactionTimeline().countInstants() > 0) {
            writeClient.runAnyPendingLogCompactions();
            ranServices = true;
        }
        return ranServices ? metadataMetaClient.reloadActiveTimeline() : activeTimeline;
    }

    void compactIfNecessary(BaseHoodieWriteClient<?, I, ?, O> writeClient, Option<String> latestDeltaCommitTimeOpt) {
        Option<String> scheduledLogCompaction;
        HoodieTimeline metadataCompletedTimeline = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants();
        String compactionInstantTime = (String)this.dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested().filter(instant -> metadataCompletedTimeline.containsInstant(instant.requestedTime())).firstInstant().map(instant -> HoodieInstantTimeGenerator.instantTimeMinusMillis((String)instant.requestedTime(), (long)1L)).orElse((Object)writeClient.createNewInstantTime(false));
        if (this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) {
            LOG.info("Compaction with same {} time is already present in the timeline.", (Object)compactionInstantTime);
        } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, (Option<Map<String, String>>)Option.empty())) {
            LOG.info("Compaction is scheduled for timestamp {}", (Object)compactionInstantTime);
            writeClient.compact(compactionInstantTime, true);
        } else if (this.metadataWriteConfig.isLogCompactionEnabled() && (scheduledLogCompaction = writeClient.scheduleLogCompaction((Option<Map<String, String>>)Option.empty())).isPresent()) {
            LOG.info("Log compaction is scheduled for timestamp {}", scheduledLogCompaction.get());
            writeClient.logCompact((String)scheduledLogCompaction.get(), true);
        }
    }

    protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) {
        Option lastCompletedCompactionInstant = this.metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant();
        if (lastCompletedCompactionInstant.isPresent() && this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().findInstantsAfter(((HoodieInstant)lastCompletedCompactionInstant.get()).requestedTime()).countInstants() < 3) {
            return;
        }
        this.executeClean(writeClient, instantTime);
        writeClient.lazyRollbackFailedIndexing();
    }

    protected void executeClean(BaseHoodieWriteClient writeClient, String instantTime) {
        writeClient.clean();
    }

    boolean validateCompactionScheduling(Option<String> inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
        if (this.metadataWriteConfig.isLogCompactionEnabled()) {
            Option pendingLogCompactionInstant = this.metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
            Option pendingCompactionInstant = this.metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
            if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) {
                LOG.info("Not scheduling compaction or logCompaction, since a pending compaction instant {} or logCompaction {} instant is present", (Object)pendingCompactionInstant, (Object)pendingLogCompactionInstant);
                return false;
            }
        }
        return true;
    }

    private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, HoodieTableMetadataUtil.DirectoryInfo> dirInfoMap, Map<String, Map<String, Long>> partitionFilesToAdd, Map<String, List<String>> partitionFilesToDelete, List<String> partitionsToDelete) throws IOException {
        for (String partition : this.metadata.fetchAllPartitionPaths()) {
            StoragePath partitionPath = null;
            partitionPath = StringUtils.isNullOrEmpty((String)partition) && !this.dataMetaClient.getTableConfig().isTablePartitioned() ? new StoragePath(this.dataWriteConfig.getBasePath()) : new StoragePath(this.dataWriteConfig.getBasePath(), partition);
            String partitionId = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition((String)partition);
            List metadataFiles = this.metadata.getAllFilesInPartition(partitionPath);
            if (!dirInfoMap.containsKey(partition)) {
                partitionsToDelete.add(partitionId);
                if (metadataFiles == null || metadataFiles.size() <= 0) continue;
                partitionFilesToDelete.put(partitionId, metadataFiles.stream().map(f -> f.getPath().getName()).collect(Collectors.toList()));
                continue;
            }
            Map fsFiles = dirInfoMap.get(partition).getFileNameToSizeMap();
            List mdtFiles = metadataFiles.stream().map(mdtFile -> mdtFile.getPath().getName()).collect(Collectors.toList());
            List filesDeleted = metadataFiles.stream().map(f -> f.getPath().getName()).filter(n -> !fsFiles.containsKey(n)).collect(Collectors.toList());
            HashMap filesToAdd = new HashMap();
            dirInfoMap.get(partition).getFileNameToSizeMap().forEach((k, v) -> {
                if (!mdtFiles.contains(k)) {
                    filesToAdd.put(k, v);
                }
            });
            if (!filesToAdd.isEmpty()) {
                partitionFilesToAdd.put(partitionId, filesToAdd);
            }
            if (filesDeleted.isEmpty()) continue;
            partitionFilesToDelete.put(partitionId, filesDeleted);
        }
    }

    private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) {
        HoodieTableFileSystemView fsView = this.getMetadataView();
        List partitionBaseFilePairs = replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(f -> Pair.of((Object)partition, (Object)f))).collect(Collectors.toList());
        return HoodieTableMetadataUtil.readRecordKeysFromBaseFiles((HoodieEngineContext)this.engineContext, (HoodieConfig)this.dataWriteConfig, partitionBaseFilePairs, (boolean)true, (int)this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), (StoragePath)this.dataMetaClient.getBasePath(), this.storageConf, (String)this.getClass().getSimpleName(), (boolean)this.dataWriteConfig.isRecordLevelIndexEnabled());
    }

    private HoodieData<HoodieRecord> getRecordIndexAdditionalUpserts(HoodieData<HoodieRecord> updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
        WriteOperationType operationType = commitMetadata.getOperationType();
        if (operationType == WriteOperationType.INSERT_OVERWRITE) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata).mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getKey(), (Object)r)).leftOuterJoin(updatesFromWriteStatuses.mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getKey(), (Object)r))).values().filter((SerializableFunction & Serializable)p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        }
        if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata).mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getRecordKey(), (Object)r)).leftOuterJoin(updatesFromWriteStatuses.mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getRecordKey(), (Object)r))).values().filter((SerializableFunction & Serializable)p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        }
        if (operationType == WriteOperationType.DELETE_PARTITION) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata);
        }
        return this.engineContext.emptyHoodieData();
    }

    protected void closeInternal() {
        try {
            this.close();
        }
        catch (Exception e) {
            throw new HoodieException("Failed to close HoodieMetadata writer ", (Throwable)e);
        }
    }

    @Override
    public boolean isInitialized() {
        return this.initialized;
    }

    protected BaseHoodieWriteClient<?, I, ?, O> getWriteClient() {
        if (this.writeClient == null) {
            this.writeClient = this.initializeWriteClient();
        }
        return this.writeClient;
    }

    protected abstract BaseHoodieWriteClient<?, I, ?, O> initializeWriteClient();

    private class BatchMetadataConversionFunction
    implements ConvertMetadataFunction {
        private final HoodieCommitMetadata commitMetadata;
        private final String instantTime;
        private final Set<String> partitionsToUpdate;

        public BatchMetadataConversionFunction(String instantTime, HoodieCommitMetadata commitMetadata, Set<String> partitionsToUpdate) {
            this.instantTime = instantTime;
            this.commitMetadata = commitMetadata;
            this.partitionsToUpdate = partitionsToUpdate;
        }

        @Override
        public Map<String, HoodieData<HoodieRecord>> convertMetadata() {
            Map<String, HoodieData<HoodieRecord>> partitionToRecordMap = HoodieMetadataWriteUtils.convertMetadataToRecords(HoodieBackedTableMetadataWriter.this.engineContext, HoodieBackedTableMetadataWriter.this.dataWriteConfig, this.commitMetadata, this.instantTime, HoodieBackedTableMetadataWriter.this.dataMetaClient, (HoodieTableMetadata)HoodieBackedTableMetadataWriter.this.getTableMetadata(), HoodieBackedTableMetadataWriter.this.dataWriteConfig.getMetadataConfig(), this.partitionsToUpdate, HoodieBackedTableMetadataWriter.this.dataWriteConfig.getBloomFilterType(), HoodieBackedTableMetadataWriter.this.dataWriteConfig.getBloomIndexParallelism(), HoodieBackedTableMetadataWriter.this.dataWriteConfig.getWritesFileIdEncoding(), HoodieBackedTableMetadataWriter.this.getEngineType(), (Option<HoodieRecord.HoodieRecordType>)Option.of((Object)HoodieBackedTableMetadataWriter.this.dataWriteConfig.getRecordMerger().getRecordType()), HoodieBackedTableMetadataWriter.this.dataWriteConfig.enableOptimizedLogBlocksScan());
            if (this.partitionsToUpdate.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath())) {
                HoodieData additionalUpdates = HoodieBackedTableMetadataWriter.this.getRecordIndexAdditionalUpserts((HoodieData<HoodieRecord>)partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()), this.commitMetadata);
                partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), (HoodieData<HoodieRecord>)partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
            }
            if (this.partitionsToUpdate.stream().anyMatch(partition -> partition.startsWith(MetadataPartitionType.EXPRESSION_INDEX.getPartitionPath()))) {
                HoodieBackedTableMetadataWriter.this.updateExpressionIndexIfPresent(this.commitMetadata, this.instantTime, partitionToRecordMap);
            }
            if (this.partitionsToUpdate.stream().anyMatch(partition -> partition.startsWith(MetadataPartitionType.SECONDARY_INDEX.getPartitionPath()))) {
                HoodieBackedTableMetadataWriter.this.updateSecondaryIndexIfPresent(this.commitMetadata, partitionToRecordMap, this.instantTime);
            }
            return partitionToRecordMap;
        }
    }

    static interface ConvertMetadataFunction {
        public Map<String, HoodieData<HoodieRecord>> convertMetadata();
    }
}

