/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.MetadataRecordsGenerationParams;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMetadataWriter {
    private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
    private static final String RECORD_KEY_FIELD_NAME = "key";
    protected HoodieWriteConfig metadataWriteConfig;
    protected HoodieWriteConfig dataWriteConfig;
    protected String tableName;
    protected HoodieBackedTableMetadata metadata;
    protected HoodieTableMetaClient metadataMetaClient;
    protected HoodieTableMetaClient dataMetaClient;
    protected Option<HoodieMetadataMetrics> metrics;
    protected boolean enabled;
    protected SerializableConfiguration hadoopConf;
    protected final transient HoodieEngineContext engineContext;
    protected final List<MetadataPartitionType> enabledPartitionTypes;

    protected <T extends SpecificRecordBase> HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, Option<T> actionMetadata, Option<String> inflightInstantTimestamp) {
        this.dataWriteConfig = writeConfig;
        this.engineContext = engineContext;
        this.hadoopConf = new SerializableConfiguration(hadoopConf);
        this.metrics = Option.empty();
        this.enabledPartitionTypes = new ArrayList<MetadataPartitionType>();
        if (writeConfig.isMetadataTableEnabled()) {
            this.tableName = writeConfig.getTableName() + "_metadata";
            this.metadataWriteConfig = this.createMetadataWriteConfig(writeConfig);
            this.enabled = true;
            ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
            ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table.");
            ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
            ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(), "File listing cannot be used for Metadata Table");
            this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(this.dataWriteConfig.getBasePath()).build();
            this.enablePartitions();
            this.initRegistry();
            this.initialize(engineContext, actionMetadata, inflightInstantTimestamp);
            this.initTableMetadata();
        } else {
            this.enabled = false;
        }
    }

    public HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
        this(hadoopConf, writeConfig, engineContext, Option.empty(), Option.empty());
    }

    private void enablePartitions() {
        boolean isBootstrapCompleted;
        HoodieMetadataConfig metadataConfig = this.dataWriteConfig.getMetadataConfig();
        Option<HoodieTableMetaClient> metaClient = Option.empty();
        try {
            isBootstrapCompleted = this.dataMetaClient.getFs().exists(new Path(this.metadataWriteConfig.getBasePath(), ".hoodie"));
            if (isBootstrapCompleted) {
                metaClient = Option.of(HoodieTableMetaClient.builder().setConf(this.hadoopConf.get()).setBasePath(this.metadataWriteConfig.getBasePath()).build());
            }
        }
        catch (IOException e) {
            throw new HoodieException("Failed to enable metadata partitions!", e);
        }
        Option<Object> fsView = Option.ofNullable(metaClient.isPresent() ? HoodieTableMetadataUtil.getFileSystemView((HoodieTableMetaClient)metaClient.get()) : null);
        this.enablePartition(MetadataPartitionType.FILES, metadataConfig, metaClient, fsView, isBootstrapCompleted);
        if (metadataConfig.isBloomFilterIndexEnabled()) {
            this.enablePartition(MetadataPartitionType.BLOOM_FILTERS, metadataConfig, metaClient, fsView, isBootstrapCompleted);
        }
        if (metadataConfig.isColumnStatsIndexEnabled()) {
            this.enablePartition(MetadataPartitionType.COLUMN_STATS, metadataConfig, metaClient, fsView, isBootstrapCompleted);
        }
    }

    private void enablePartition(MetadataPartitionType partitionType, HoodieMetadataConfig metadataConfig, Option<HoodieTableMetaClient> metaClient, Option<HoodieTableFileSystemView> fsView, boolean isBootstrapCompleted) {
        int fileGroupCount = HoodieTableMetadataUtil.getPartitionFileGroupCount(partitionType, metaClient, fsView, metadataConfig, isBootstrapCompleted);
        partitionType.setFileGroupCount(fileGroupCount);
        this.enabledPartitionTypes.add(partitionType);
    }

    protected abstract void initRegistry();

    private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig) {
        int parallelism = writeConfig.getMetadataInsertParallelism();
        int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep());
        int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep());
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()).withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()).withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs()).withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()).build()).withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build()).withAutoCommit(true).withAvroSchemaValidate(true).withEmbeddedTimelineServerEnabled(false).withMarkersType(MarkerType.DIRECT.name()).withRollbackUsingMarkers(false).withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath())).withSchema(HoodieMetadataRecord.getClassSchema().toString()).forTable(this.tableName).withCleanConfig(HoodieCleanConfig.newBuilder().withAsyncClean(writeConfig.isMetadataAsyncClean()).withAutoClean(false).withCleanerParallelism(parallelism).withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).retainCommits(writeConfig.getMetadataCleanerCommitsRetained()).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep).withAutoArchive(false).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).withPreserveCommitMetadata(false).build()).withParallelism(parallelism, parallelism).withDeleteParallelism(parallelism).withRollbackParallelism(parallelism).withFinalizeWriteParallelism(parallelism).withAllowMultiWriteOnSameInstant(true).withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName()).withPopulateMetaFields(this.dataWriteConfig.getMetadataConfig().populateMetaFields());
        Properties properties = new Properties();
        properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), RECORD_KEY_FIELD_NAME);
        properties.put("hoodie.datasource.write.recordkey.field", RECORD_KEY_FIELD_NAME);
        builder.withProperties(properties);
        if (writeConfig.isMetricsOn()) {
            Properties commonProperties = new Properties();
            commonProperties.put(HoodieWriteConfig.TBL_NAME.key(), this.tableName);
            builder.withMetricsConfig(HoodieMetricsConfig.newBuilder().fromProperties(commonProperties).withReporterType(writeConfig.getMetricsReporterType().toString()).withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()).on(true).build());
            switch (writeConfig.getMetricsReporterType()) {
                case GRAPHITE: {
                    builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder().onGraphitePort(writeConfig.getGraphiteServerPort()).toGraphiteHost(writeConfig.getGraphiteServerHost()).usePrefix(writeConfig.getGraphiteMetricPrefix()).build());
                    break;
                }
                case JMX: {
                    builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder().onJmxPort(writeConfig.getJmxPort()).toJmxHost(writeConfig.getJmxHost()).build());
                    break;
                }
                case DATADOG: 
                case PROMETHEUS: 
                case PROMETHEUS_PUSHGATEWAY: 
                case CONSOLE: 
                case INMEMORY: 
                case CLOUDWATCH: {
                    break;
                }
                default: {
                    throw new HoodieMetadataException("Unsupported Metrics Reporter type " + (Object)((Object)writeConfig.getMetricsReporterType()));
                }
            }
        }
        return builder.build();
    }

    public HoodieWriteConfig getWriteConfig() {
        return this.metadataWriteConfig;
    }

    public HoodieBackedTableMetadata getTableMetadata() {
        return this.metadata;
    }

    public List<MetadataPartitionType> getEnabledPartitionTypes() {
        return this.enabledPartitionTypes;
    }

    protected abstract <T extends SpecificRecordBase> void initialize(HoodieEngineContext var1, Option<T> var2, Option<String> var3);

    public void initTableMetadata() {
        try {
            if (this.metadata != null) {
                this.metadata.close();
            }
            this.metadata = new HoodieBackedTableMetadata(this.engineContext, this.dataWriteConfig.getMetadataConfig(), this.dataWriteConfig.getBasePath(), this.dataWriteConfig.getSpillableMapBasePath());
            this.metadataMetaClient = this.metadata.getMetadataMetaClient();
        }
        catch (Exception e) {
            throw new HoodieException("Error initializing metadata table for reads", e);
        }
    }

    protected <T extends SpecificRecordBase> void initializeIfNeeded(HoodieTableMetaClient dataMetaClient, Option<T> actionMetadata, Option<String> inflightInstantTimestamp) throws IOException {
        HoodieTimer timer = new HoodieTimer().startTimer();
        boolean exists = this.metadataTableExists(dataMetaClient, actionMetadata);
        if (!exists) {
            if (this.initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
                this.metrics.ifPresent(m -> m.updateMetrics("initialize", timer.endTimer()));
            }
            return;
        }
        if (!this.dataWriteConfig.isMetadataAsyncIndex()) {
            Set<String> inflightAndCompletedPartitions = HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
            LOG.info((Object)("Async metadata indexing enabled and following partitions already initialized: " + inflightAndCompletedPartitions));
            List<MetadataPartitionType> partitionsToInit = this.enabledPartitionTypes.stream().filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)).collect(Collectors.toList());
            if (partitionsToInit.isEmpty() || this.anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
                return;
            }
            String createInstantTime = this.getInitialCommitInstantTime(dataMetaClient);
            this.initTableMetadata();
            this.initializeEnabledFileGroups(dataMetaClient, createInstantTime, partitionsToInit);
            this.initialCommit(createInstantTime, partitionsToInit);
            this.updateInitializedPartitionsInTableConfig(partitionsToInit);
        }
    }

    private <T extends SpecificRecordBase> boolean metadataTableExists(HoodieTableMetaClient dataMetaClient, Option<T> actionMetadata) throws IOException {
        boolean exists = dataMetaClient.getFs().exists(new Path(this.metadataWriteConfig.getBasePath(), ".hoodie"));
        boolean reInitialize = false;
        if (exists) {
            HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf.get()).setBasePath(this.metadataWriteConfig.getBasePath()).build();
            if (this.dataWriteConfig.getMetadataConfig().populateMetaFields() != metadataMetaClient.getTableConfig().populateMetaFields()) {
                LOG.info((Object)"Re-initiating metadata table properties since populate meta fields have changed");
                metadataMetaClient = this.initializeMetaClient(this.dataWriteConfig.getMetadataConfig().populateMetaFields());
            }
            Option<HoodieInstant> latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
            reInitialize = this.isBootstrapNeeded(latestMetadataInstant, actionMetadata);
        }
        if (reInitialize) {
            this.metrics.ifPresent(m -> m.updateMetrics("rebootstrap", 1L));
            LOG.info((Object)"Deleting Metadata Table directory so that it can be re-initialized");
            dataMetaClient.getFs().delete(new Path(this.metadataWriteConfig.getBasePath()), true);
            exists = false;
        }
        return exists;
    }

    private <T extends SpecificRecordBase> boolean isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant, Option<T> actionMetadata) {
        if (!latestMetadataInstant.isPresent()) {
            LOG.warn((Object)"Metadata Table will need to be re-initialized as no instants were found");
            return true;
        }
        String latestMetadataInstantTimestamp = latestMetadataInstant.get().getTimestamp();
        if (latestMetadataInstantTimestamp.equals("00000000000000")) {
            return false;
        }
        if (this.dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp()) && !this.isCommitRevertedByInFlightAction(actionMetadata, latestMetadataInstantTimestamp)) {
            LOG.error((Object)("Metadata Table will need to be re-initialized as un-synced instants have been archived. latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() + ", latestDataInstant=" + this.dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp()));
            return true;
        }
        return false;
    }

    private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction(Option<T> actionMetadata, String latestMetadataInstantTimestamp) {
        String INSTANT_ACTION;
        if (!actionMetadata.isPresent()) {
            return false;
        }
        switch (INSTANT_ACTION = actionMetadata.get() instanceof HoodieRollbackMetadata ? "rollback" : (actionMetadata.get() instanceof HoodieRestoreMetadata ? "restore" : "")) {
            case "rollback": {
                List<HoodieInstantInfo> rollbackedInstants = ((HoodieRollbackMetadata)((Object)actionMetadata.get())).getInstantsRollback();
                List affectedInstantTimestamps = rollbackedInstants.stream().map(HoodieInstantInfo::getCommitTime).collect(Collectors.toList());
                if (!affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) break;
                return true;
            }
            case "restore": {
                List<HoodieInstantInfo> restoredInstants = ((HoodieRestoreMetadata)((Object)actionMetadata.get())).getRestoreInstantInfo();
                List affectedInstantTimestamps = restoredInstants.stream().map(HoodieInstantInfo::getCommitTime).collect(Collectors.toList());
                if (!affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) break;
                return true;
            }
            default: {
                return false;
            }
        }
        return false;
    }

    private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) throws IOException {
        if (this.anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
            return false;
        }
        String createInstantTime = this.getInitialCommitInstantTime(dataMetaClient);
        this.initializeMetaClient(this.dataWriteConfig.getMetadataConfig().populateMetaFields());
        this.initTableMetadata();
        ArrayList<MetadataPartitionType> enabledPartitionTypes = new ArrayList<MetadataPartitionType>();
        if (this.dataWriteConfig.isMetadataAsyncIndex()) {
            enabledPartitionTypes.add(MetadataPartitionType.FILES);
        } else {
            enabledPartitionTypes = this.enabledPartitionTypes;
        }
        this.initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes);
        this.initialCommit(createInstantTime, enabledPartitionTypes);
        this.updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
        return true;
    }

    private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) {
        String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse("00000000000000");
        LOG.info((Object)("Creating a new metadata table in " + this.metadataWriteConfig.getBasePath() + " at instant " + createInstantTime));
        return createInstantTime;
    }

    private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) {
        ValidationUtils.checkState(this.enabled, "Metadata table cannot be initialized as it is not enabled");
        List pendingDataInstant = dataMetaClient.getActiveTimeline().getInstants().filter(i -> !i.isCompleted()).filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())).filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList());
        if (!pendingDataInstant.isEmpty()) {
            this.metrics.ifPresent(m -> m.updateMetrics("bootstrap_error", 1L));
            LOG.warn((Object)("Cannot initialize metadata table as operation(s) are in progress on the dataset: " + Arrays.toString(pendingDataInstant.toArray())));
            return true;
        }
        return false;
    }

    private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) {
        Set<String> completedPartitions = this.dataMetaClient.getTableConfig().getMetadataPartitions();
        completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
        this.dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join((CharSequence)",", completedPartitions));
        HoodieTableConfig.update(this.dataMetaClient.getFs(), new Path(this.dataMetaClient.getMetaPath()), this.dataMetaClient.getTableConfig().getProps());
    }

    private HoodieTableMetaClient initializeMetaClient(boolean populateMetaFields) throws IOException {
        return HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.MERGE_ON_READ).setTableName(this.tableName).setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPayloadClassName(HoodieMetadataPayload.class.getName()).setBaseFileFormat(HoodieFileFormat.HFILE.toString()).setRecordKeyFields(RECORD_KEY_FIELD_NAME).setPopulateMetaFields(populateMetaFields).setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()).initTable(this.hadoopConf.get(), this.metadataWriteConfig.getBasePath());
    }

    private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient datasetMetaClient) {
        LinkedList<Path> pathsToList = new LinkedList<Path>();
        pathsToList.add(new Path(this.dataWriteConfig.getBasePath()));
        LinkedList<DirectoryInfo> partitionsToBootstrap = new LinkedList<DirectoryInfo>();
        int fileListingParallelism = this.metadataWriteConfig.getFileListingParallelism();
        SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
        String dirFilterRegex = this.dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
        String datasetBasePath = datasetMetaClient.getBasePath();
        while (!pathsToList.isEmpty()) {
            int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
            List<DirectoryInfo> processedDirectories = this.engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
                FileSystem fs = path.getFileSystem(conf.get());
                String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), path);
                return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
            }, numDirsToList);
            pathsToList = new LinkedList(pathsToList.subList(numDirsToList, pathsToList.size()));
            for (DirectoryInfo dirInfo : processedDirectories) {
                Path partitionPath;
                String relativePath;
                if (!dirFilterRegex.isEmpty() && !(relativePath = dirInfo.getRelativePath()).isEmpty() && (partitionPath = new Path(datasetBasePath, relativePath)).getName().matches(dirFilterRegex)) {
                    LOG.info((Object)("Ignoring directory " + partitionPath + " which matches the filter regex " + dirFilterRegex));
                    continue;
                }
                if (dirInfo.isHoodiePartition()) {
                    partitionsToBootstrap.add(dirInfo);
                    continue;
                }
                pathsToList.addAll(dirInfo.getSubDirectories());
            }
        }
        return partitionsToBootstrap;
    }

    private void initializeEnabledFileGroups(HoodieTableMetaClient dataMetaClient, String createInstantTime, List<MetadataPartitionType> partitionTypes) throws IOException {
        for (MetadataPartitionType enabledPartitionType : partitionTypes) {
            this.initializeFileGroups(dataMetaClient, enabledPartitionType, createInstantTime, enabledPartitionType.getFileGroupCount());
        }
    }

    @Override
    public void initializeMetadataPartitions(HoodieTableMetaClient dataMetaClient, List<MetadataPartitionType> metadataPartitions, String instantTime) throws IOException {
        for (MetadataPartitionType partitionType : metadataPartitions) {
            this.initializeFileGroups(dataMetaClient, partitionType, instantTime, partitionType.getFileGroupCount());
        }
    }

    private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, int fileGroupCount) throws IOException {
        HashMap<HoodieLogBlock.HeaderMetadataType, String> blockHeader = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
        blockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
        HoodieDeleteBlock block = new HoodieDeleteBlock(new DeleteRecord[0], blockHeader);
        LOG.info((Object)String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s", fileGroupCount, metadataPartition.getPartitionPath(), metadataPartition.getFileIdPrefix(), instantTime));
        for (int i = 0; i < fileGroupCount; ++i) {
            String fileGroupFileId = String.format("%s%04d", metadataPartition.getFileIdPrefix(), i);
            try {
                HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.getPartitionPath(this.metadataWriteConfig.getBasePath(), metadataPartition.getPartitionPath())).withFileId(fileGroupFileId).overBaseCommit(instantTime).withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION).withFileSize(0L).withSizeThreshold(this.metadataWriteConfig.getLogFileMaxSize()).withFs(dataMetaClient.getFs()).withRolloverLogWriteToken("0-0-0").withLogWriteToken("0-0-0").withFileExtension(".log").build();
                writer.appendBlock(block);
                writer.close();
                continue;
            }
            catch (InterruptedException e) {
                throw new HoodieException("Failed to created fileGroup " + fileGroupFileId + " for partition " + metadataPartition.getPartitionPath(), e);
            }
        }
    }

    @Override
    public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
        Set<String> completedIndexes = this.dataMetaClient.getTableConfig().getMetadataPartitions();
        Set<String> inflightIndexes = HoodieTableMetadataUtil.getInflightMetadataPartitions(this.dataMetaClient.getTableConfig());
        for (MetadataPartitionType partitionType : metadataPartitions) {
            String partitionPath = partitionType.getPartitionPath();
            if (inflightIndexes.contains(partitionPath)) {
                inflightIndexes.remove(partitionPath);
                this.dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join((CharSequence)",", inflightIndexes));
            } else if (completedIndexes.contains(partitionPath)) {
                completedIndexes.remove(partitionPath);
                this.dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join((CharSequence)",", completedIndexes));
            }
            HoodieTableConfig.update(this.dataMetaClient.getFs(), new Path(this.dataMetaClient.getMetaPath()), this.dataMetaClient.getTableConfig().getProps());
            LOG.warn((Object)("Deleting Metadata Table partitions: " + partitionPath));
            this.dataMetaClient.getFs().delete(new Path(this.metadataWriteConfig.getBasePath(), partitionPath), true);
            LOG.warn((Object)("Deleting pending indexing instant from the timeline for partition: " + partitionPath));
            HoodieBackedTableMetadataWriter.deletePendingIndexingInstant(this.dataMetaClient, partitionPath);
        }
    }

    private static void deletePendingIndexingInstant(HoodieTableMetaClient metaClient, String partitionPath) {
        metaClient.reloadActiveTimeline().filterPendingIndexTimeline().getInstants().filter(instant -> HoodieInstant.State.REQUESTED.equals((Object)instant.getState())).forEach(instant -> {
            try {
                HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(metaClient.getActiveTimeline().readIndexPlanAsBytes((HoodieInstant)instant).get());
                if (indexPlan.getIndexPartitionInfos().stream().anyMatch(indexPartitionInfo -> indexPartitionInfo.getMetadataPartitionPath().equals(partitionPath))) {
                    metaClient.getActiveTimeline().deleteInstantFileIfExists((HoodieInstant)instant);
                    metaClient.getActiveTimeline().deleteInstantFileIfExists(HoodieTimeline.getIndexInflightInstant(instant.getTimestamp()));
                }
            }
            catch (IOException e) {
                LOG.error((Object)("Failed to delete the instant file corresponding to " + instant));
            }
        });
    }

    private MetadataRecordsGenerationParams getRecordsGenerationParams() {
        return new MetadataRecordsGenerationParams(this.dataMetaClient, this.enabledPartitionTypes, this.dataWriteConfig.getBloomFilterType(), this.dataWriteConfig.getMetadataBloomFilterIndexParallelism(), this.dataWriteConfig.isMetadataColumnStatsIndexEnabled(), this.dataWriteConfig.getColumnStatsIndexParallelism(), this.dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), this.dataWriteConfig.getColumnsEnabledForBloomFilterIndex());
    }

    private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
        if (!this.dataWriteConfig.isMetadataTableEnabled()) {
            return;
        }
        Set<String> partitionsToUpdate = this.getMetadataPartitionsToUpdate();
        Set<String> inflightIndexes = HoodieTableMetadataUtil.getInflightMetadataPartitions(this.dataMetaClient.getTableConfig());
        boolean doNotTriggerTableService = partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);
        if (this.enabled && this.metadata != null) {
            Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata().entrySet().stream().filter(entry -> partitionsToUpdate.contains(((MetadataPartitionType)((Object)((Object)entry.getKey()))).getPartitionPath())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            this.commit(instantTime, partitionRecordsMap, !doNotTriggerTableService && canTriggerTableService);
        }
    }

    private Set<String> getMetadataPartitionsToUpdate() {
        Set<String> partitionsToUpdate = this.dataMetaClient.getTableConfig().getMetadataPartitions();
        partitionsToUpdate.addAll(HoodieTableMetadataUtil.getInflightMetadataPartitions(this.dataMetaClient.getTableConfig()));
        if (!partitionsToUpdate.isEmpty()) {
            return partitionsToUpdate;
        }
        LOG.warn((Object)"There are no partitions to update according to table config. Falling back to enabled partition types in the write config.");
        return this.getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
    }

    @Override
    public void buildMetadataPartitions(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos) {
        if (indexPartitionInfos.isEmpty()) {
            LOG.warn((Object)"No partition to index in the plan");
            return;
        }
        String indexUptoInstantTime = indexPartitionInfos.get(0).getIndexUptoInstant();
        ArrayList<MetadataPartitionType> partitionTypes = new ArrayList<MetadataPartitionType>();
        indexPartitionInfos.forEach(indexPartitionInfo -> {
            String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
            LOG.info((Object)String.format("Creating a new metadata index for partition '%s' under path %s upto instant %s", relativePartitionPath, this.metadataWriteConfig.getBasePath(), indexUptoInstantTime));
            try {
                if (!this.dataMetaClient.getFs().exists(new Path(this.metadataWriteConfig.getBasePath(), relativePartitionPath))) {
                    throw new HoodieIndexException(String.format("File group not initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index scheduling failed!", relativePartitionPath, indexUptoInstantTime));
                }
            }
            catch (IOException e) {
                throw new HoodieIndexException(String.format("Unable to check whether file group is initialized for metadata partition: %s, indexUptoInstant: %s", relativePartitionPath, indexUptoInstantTime));
            }
            MetadataPartitionType partitionType = MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
            if (!this.enabledPartitionTypes.contains((Object)partitionType)) {
                throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", new Object[]{partitionType}));
            }
            partitionTypes.add(partitionType);
        });
        Set<String> inflightIndexes = HoodieTableMetadataUtil.getInflightMetadataPartitions(this.dataMetaClient.getTableConfig());
        inflightIndexes.addAll(indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
        this.dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join((CharSequence)",", inflightIndexes));
        HoodieTableConfig.update(this.dataMetaClient.getFs(), new Path(this.dataMetaClient.getMetaPath()), this.dataMetaClient.getTableConfig().getProps());
        this.initialCommit(indexUptoInstantTime, partitionTypes);
    }

    @Override
    public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction) {
        this.processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(this.engineContext, commitMetadata, instantTime, this.getRecordsGenerationParams()), !isTableServiceAction);
    }

    @Override
    public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
        this.processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(this.engineContext, cleanMetadata, this.getRecordsGenerationParams(), instantTime), false);
    }

    @Override
    public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
        this.processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(this.engineContext, this.metadataMetaClient.getActiveTimeline(), restoreMetadata, this.getRecordsGenerationParams(), instantTime, this.metadata.getSyncedInstantTime()), false);
    }

    @Override
    public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
        if (this.enabled && this.metadata != null) {
            Option<String> latestCompaction;
            String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
            boolean wasSynced = this.metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, "deltacommit", rollbackInstant));
            if (!wasSynced && (latestCompaction = this.metadata.getLatestCompactionTime()).isPresent()) {
                wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant, HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get());
            }
            Map<MetadataPartitionType, HoodieData<HoodieRecord>> records = HoodieTableMetadataUtil.convertMetadataToRecords(this.engineContext, this.metadataMetaClient.getActiveTimeline(), rollbackMetadata, this.getRecordsGenerationParams(), instantTime, this.metadata.getSyncedInstantTime(), wasSynced);
            this.commit(instantTime, records, false);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.metadata != null) {
            this.metadata.close();
        }
    }

    protected abstract void commit(String var1, Map<MetadataPartitionType, HoodieData<HoodieRecord>> var2, boolean var3);

    protected HoodieData<HoodieRecord> prepRecords(Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) {
        HoodieData<HoodieRecord> allPartitionRecords = this.engineContext.emptyHoodieData();
        HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemView(this.metadataMetaClient);
        for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
            String partitionName = entry.getKey().getPartitionPath();
            int fileGroupCount = entry.getKey().getFileGroupCount();
            HoodieData<HoodieRecord> records = entry.getValue();
            List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(this.metadataMetaClient, Option.ofNullable(fsView), partitionName);
            if (fileSlices.isEmpty()) {
                fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(this.metadataMetaClient, Option.ofNullable(fsView), partitionName);
            }
            ValidationUtils.checkArgument(fileSlices.size() == fileGroupCount, String.format("Invalid number of file groups for partition:%s, found=%d, required=%d", partitionName, fileSlices.size(), fileGroupCount));
            List<FileSlice> finalFileSlices = fileSlices;
            HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
                FileSlice slice = (FileSlice)finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), fileGroupCount));
                r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
                return r;
            });
            allPartitionRecords = allPartitionRecords.union(rddSinglePartitionRecords);
        }
        return allPartitionRecords;
    }

    protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) {
        writeClient.runAnyPendingCompactions();
        String latestDeltaCommitTime = this.metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
        List pendingInstants = this.dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested().findInstantsBefore(instantTime).getInstants().collect(Collectors.toList());
        if (!pendingInstants.isEmpty()) {
            LOG.info((Object)String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s", pendingInstants.size(), latestDeltaCommitTime, Arrays.toString(pendingInstants.toArray())));
            return;
        }
        String compactionInstantTime = latestDeltaCommitTime + "001";
        if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
            writeClient.compact(compactionInstantTime);
        }
    }

    protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) {
        Option<HoodieInstant> lastCompletedCompactionInstant = this.metadataMetaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
        if (lastCompletedCompactionInstant.isPresent() && this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().findInstantsAfter(lastCompletedCompactionInstant.get().getTimestamp()).countInstants() < 3) {
            return;
        }
        writeClient.clean(instantTime + "002");
    }

    private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) {
        HoodieData<HoodieRecord> recordsRDD;
        LOG.info((Object)("Initializing metadata table by using file listings in " + this.dataWriteConfig.getBasePath()));
        this.engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + this.dataWriteConfig.getTableName());
        HashMap<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<MetadataPartitionType, HoodieData<HoodieRecord>>();
        List<DirectoryInfo> partitionInfoList = this.listAllPartitions(this.dataMetaClient);
        Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream().map(p -> {
            String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
            return Pair.of(partitionName, p.getFileNameToSizeMap());
        }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
        ArrayList<String> partitions = new ArrayList<String>(partitionToFilesMap.keySet());
        if (partitionTypes.contains((Object)MetadataPartitionType.FILES)) {
            HoodieRecord<HoodieMetadataPayload> allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
            HoodieData<HoodieRecord> filesPartitionRecords = this.getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
            ValidationUtils.checkState(filesPartitionRecords.count() == (long)(partitions.size() + 1));
            partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
        }
        if (partitionTypes.contains((Object)MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) {
            recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(this.engineContext, Collections.emptyMap(), partitionToFilesMap, this.getRecordsGenerationParams(), createInstantTime);
            partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
        }
        if (partitionTypes.contains((Object)MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) {
            recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(this.engineContext, Collections.emptyMap(), partitionToFilesMap, this.getRecordsGenerationParams());
            partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
        }
        LOG.info((Object)("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata"));
        this.commit(createInstantTime, partitionToRecordsMap, false);
    }

    private HoodieData<HoodieRecord> getFilesPartitionRecords(String createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord allPartitionRecord) {
        HoodieData<HoodieRecord> filesPartitionRecords = this.engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
        if (partitionInfoList.isEmpty()) {
            return filesPartitionRecords;
        }
        HoodieData<HoodieRecord> fileListRecords = this.engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
            Map<String, Long> fileNameToSizeMap = partitionInfo.getFileNameToSizeMap();
            Map<String, Long> validFileNameToSizeMap = fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {
                String commitTime = FSUtils.getCommitTime((String)fileSizePair.getKey());
                return HoodieTimeline.compareTimestamps(commitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, createInstantTime);
            }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            return HoodieMetadataPayload.createPartitionFilesRecord(HoodieTableMetadataUtil.getPartitionIdentifier(partitionInfo.getRelativePath()), Option.of(validFileNameToSizeMap), Option.empty());
        });
        return filesPartitionRecords.union(fileListRecords);
    }

    static class DirectoryInfo
    implements Serializable {
        private final String relativePath;
        private final HashMap<String, Long> filenameToSizeMap;
        private final List<Path> subDirectories = new ArrayList<Path>();
        private boolean isHoodiePartition = false;

        public DirectoryInfo(String relativePath, FileStatus[] fileStatus) {
            this.relativePath = relativePath;
            this.filenameToSizeMap = new HashMap(fileStatus.length);
            for (FileStatus status : fileStatus) {
                if (status.isDirectory()) {
                    if (status.getPath().getName().equals(".hoodie")) continue;
                    this.subDirectories.add(status.getPath());
                    continue;
                }
                if (status.getPath().getName().startsWith(".hoodie_partition_metadata")) {
                    this.isHoodiePartition = true;
                    continue;
                }
                if (!FSUtils.isDataFile(status.getPath())) continue;
                this.filenameToSizeMap.put(status.getPath().getName(), status.getLen());
            }
        }

        String getRelativePath() {
            return this.relativePath;
        }

        int getTotalFiles() {
            return this.filenameToSizeMap.size();
        }

        boolean isHoodiePartition() {
            return this.isHoodiePartition;
        }

        List<Path> getSubDirectories() {
            return this.subDirectories;
        }

        Map<String, Long> getFileNameToSizeMap() {
            return this.filenameToSizeMap;
        }
    }

    private static interface ConvertMetadataFunction {
        public Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadata();
    }
}

