/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieSeekingFileReader;
import org.apache.hudi.metadata.BaseTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.util.Transient;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieBackedTableMetadata
extends BaseTableMetadata {
    private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
    private final String metadataBasePath;
    private HoodieTableMetaClient metadataMetaClient;
    private HoodieTableConfig metadataTableConfig;
    private HoodieTableFileSystemView metadataFileSystemView;
    private final boolean reuse;
    private final Transient<Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>>> partitionReaders = Transient.lazy(ConcurrentHashMap::new);

    public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapDirectory) {
        this(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory, false);
    }

    public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapDirectory, boolean reuse) {
        super(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory);
        this.reuse = reuse;
        this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(this.dataBasePath.toString());
        this.initIfNeeded();
    }

    private void initIfNeeded() {
        if (!this.isMetadataTableEnabled) {
            if (!HoodieTableMetadata.isMetadataTable(this.metadataBasePath)) {
                LOG.info((Object)"Metadata table is disabled.");
            }
        } else if (this.metadataMetaClient == null) {
            try {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.getHadoopConf()).setBasePath(this.metadataBasePath).build();
                this.metadataFileSystemView = HoodieTableMetadataUtil.getFileSystemView(this.metadataMetaClient);
                this.metadataTableConfig = this.metadataMetaClient.getTableConfig();
                this.isBloomFilterIndexEnabled = this.metadataConfig.isBloomFilterIndexEnabled();
                this.isColumnStatsIndexEnabled = this.metadataConfig.isColumnStatsIndexEnabled();
            }
            catch (TableNotFoundException e) {
                LOG.warn((Object)("Metadata table was not found at path " + this.metadataBasePath));
                this.isMetadataTableEnabled = false;
                this.metadataMetaClient = null;
                this.metadataFileSystemView = null;
                this.metadataTableConfig = null;
            }
            catch (Exception e) {
                LOG.error((Object)("Failed to initialize metadata table at path " + this.metadataBasePath), (Throwable)e);
                this.isMetadataTableEnabled = false;
                this.metadataMetaClient = null;
                this.metadataFileSystemView = null;
                this.metadataTableConfig = null;
            }
        }
    }

    @Override
    protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key, String partitionName) {
        List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> recordsByKeys = this.getRecordsByKeys(Collections.singletonList(key), partitionName);
        return recordsByKeys.size() == 0 ? Option.empty() : recordsByKeys.get(0).getValue();
    }

    @Override
    public List<String> getPartitionPathWithPathPrefixes(List<String> relativePathPrefixes) throws IOException {
        return this.getAllPartitionPaths().stream().filter(p -> relativePathPrefixes.stream().anyMatch(relativePathPrefix -> StringUtils.isNullOrEmpty(relativePathPrefix) || p.equals(relativePathPrefix) || p.startsWith(relativePathPrefix + "/"))).collect(Collectors.toList());
    }

    @Override
    public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName, boolean shouldLoadInMemory) {
        ArrayList<String> sortedKeyPrefixes = new ArrayList<String>(keyPrefixes);
        Collections.sort(sortedKeyPrefixes);
        List<FileSlice> partitionFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(this.metadataMetaClient, this.metadataFileSystemView, partitionName);
        return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : this.engineContext.parallelize(partitionFileSlices)).flatMap(fileSlice -> {
            Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = this.openReaders(partitionName, (FileSlice)fileSlice);
            try {
                ArrayList<Long> timings = new ArrayList<Long>();
                HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
                HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
                if (baseFileReader == null && logRecordScanner == null) {
                    Iterator iterator2 = Collections.emptyIterator();
                    return iterator2;
                }
                boolean fullKeys = false;
                Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = this.readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings);
                List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords = this.readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
                LOG.debug((Object)String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", sortedKeyPrefixes.size(), timings));
                Iterator iterator3 = mergedRecords.stream().map(keyRecordPair -> ((Option)keyRecordPair.getValue()).orElse(null)).filter(Objects::nonNull).iterator();
                return iterator3;
            }
            catch (IOException ioe) {
                throw new HoodieIOException("Error merging records from metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
            }
            finally {
                this.closeReader(readers);
            }
        });
    }

    @Override
    public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys2, String partitionName) {
        ArrayList<String> sortedKeys = new ArrayList<String>(keys2);
        Collections.sort(sortedKeys);
        Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = this.getPartitionFileSliceToKeysMapping(partitionName, sortedKeys);
        ArrayList<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>();
        AtomicInteger fileSlicesKeysCount = new AtomicInteger();
        partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {
            Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = this.getOrCreateReaders(partitionName, (FileSlice)partitionFileSlicePair.getRight());
            try {
                ArrayList<Long> timings = new ArrayList<Long>();
                HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
                HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
                if (baseFileReader == null && logRecordScanner == null) {
                    return;
                }
                boolean fullKeys = true;
                Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = this.readLogRecords(logRecordScanner, (List<String>)fileSliceKeys, fullKeys, (List<Long>)timings);
                result.addAll(this.readFromBaseAndMergeWithLogRecords(baseFileReader, (List<String>)fileSliceKeys, fullKeys, logRecords, (List<Long>)timings, partitionName));
                LOG.debug((Object)String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", fileSliceKeys.size(), timings));
                fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
            }
            catch (IOException ioe) {
                throw new HoodieIOException("Error merging records from metadata table for  " + sortedKeys.size() + " key : ", ioe);
            }
            finally {
                if (!this.reuse) {
                    this.closeReader(readers);
                }
            }
        });
        return result;
    }

    private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader, List<String> keys2, boolean fullKey, List<Long> timings) {
        HoodieTimer timer = HoodieTimer.start();
        if (logRecordReader == null) {
            timings.add(timer.endTimer());
            return Collections.emptyMap();
        }
        HashMap<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<String, Option<HoodieRecord<HoodieMetadataPayload>>>(keys2.size());
        List logRecordsList = (fullKey ? logRecordReader.getRecordsByKeys(keys2) : logRecordReader.getRecordsByKeyPrefixes(keys2)).stream().map(record -> Pair.of(record.getRecordKey(), Option.of(record))).collect(Collectors.toList());
        List<String> missingKeys = CollectionUtils.diff(keys2, logRecords.keySet());
        for (String key : missingKeys) {
            logRecords.put(key, Option.empty());
        }
        for (Pair entry : logRecordsList) {
            logRecords.put((String)entry.getKey(), (Option<HoodieRecord<HoodieMetadataPayload>>)entry.getValue());
        }
        timings.add(timer.endTimer());
        return logRecords;
    }

    private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader, List<String> keys2, boolean fullKeys, Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords, List<Long> timings, String partitionName) throws IOException {
        HoodieTimer timer = HoodieTimer.start();
        if (reader == null) {
            timings.add(timer.endTimer());
            if (fullKeys) {
                return keys2.stream().map(key -> Pair.of(key, logRecords.getOrDefault(key, Option.empty()))).collect(Collectors.toList());
            }
            return logRecords.entrySet().stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList());
        }
        HoodieTimer readTimer = HoodieTimer.start();
        Map<String, HoodieRecord<HoodieMetadataPayload>> records = this.fetchBaseFileRecordsByKeys(reader, keys2, fullKeys, partitionName);
        this.metrics.ifPresent(m -> m.updateMetrics("basefile_read", readTimer.endTimer()));
        for (Option<HoodieRecord<HoodieMetadataPayload>> logRecordOpt : logRecords.values()) {
            if (!logRecordOpt.isPresent()) continue;
            HoodieRecord<HoodieMetadataPayload> logRecord = logRecordOpt.get();
            records.merge(logRecord.getRecordKey(), logRecord, (oldRecord, newRecord) -> new HoodieAvroRecord<HoodieMetadataPayload>(oldRecord.getKey(), ((HoodieMetadataPayload)newRecord.getData()).preCombine((HoodieMetadataPayload)oldRecord.getData())));
        }
        timings.add(timer.endTimer());
        if (fullKeys) {
            return keys2.stream().map(key -> Pair.of(key, Option.ofNullable(records.get(key)))).collect(Collectors.toList());
        }
        return records.values().stream().map(record -> Pair.of(record.getRecordKey(), Option.of(record))).collect(Collectors.toList());
    }

    private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader, List<String> keys2, boolean fullKeys, String partitionName) throws IOException {
        ClosableIterator records = fullKeys ? reader.getRecordsByKeysIterator(keys2) : reader.getRecordsByKeyPrefixIterator(keys2);
        return CollectionUtils.toStream(records).map(record -> {
            GenericRecord data = (GenericRecord)record.getData();
            return Pair.of((String)data.get("key"), this.composeRecord(data, partitionName));
        }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
    }

    private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord avroRecord, String partitionName) {
        if (this.metadataTableConfig.populateMetaFields()) {
            return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, this.metadataTableConfig.getPayloadClass(), this.metadataTableConfig.getPreCombineField(), false);
        }
        return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, this.metadataTableConfig.getPayloadClass(), this.metadataTableConfig.getPreCombineField(), Pair.of(this.metadataTableConfig.getRecordKeyFieldProp(), this.metadataTableConfig.getPartitionFieldProp()), false, Option.of(partitionName));
    }

    private Map<Pair<String, FileSlice>, List<String>> getPartitionFileSliceToKeysMapping(String partitionName, List<String> keys2) {
        List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(this.metadataMetaClient, this.metadataFileSystemView, partitionName);
        HashMap<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = new HashMap<Pair<String, FileSlice>, List<String>>();
        for (String key : keys2) {
            if (CollectionUtils.isNullOrEmpty(latestFileSlices)) continue;
            FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, latestFileSlices.size()));
            Pair<String, FileSlice> partitionNameFileSlicePair = Pair.of(partitionName, slice);
            partitionFileSliceToKeysMap.computeIfAbsent(partitionNameFileSlicePair, k -> new ArrayList()).add(key);
        }
        return partitionFileSliceToKeysMap;
    }

    private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
        if (this.reuse) {
            Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
            return this.partitionReaders.get().computeIfAbsent(key, ignored -> this.openReaders(partitionName, slice));
        }
        return this.openReaders(partitionName, slice);
    }

    private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> openReaders(String partitionName, FileSlice slice) {
        try {
            HoodieTimer timer = HoodieTimer.start();
            Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair = this.getBaseFileReader(slice, timer);
            HoodieSeekingFileReader<?> baseFileReader = baseFileReaderOpenTimePair.getKey();
            long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
            List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
            Pair<HoodieMetadataLogRecordReader, Long> logRecordScannerOpenTimePair = this.getLogRecordScanner(logFiles, partitionName, Option.empty());
            HoodieMetadataLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
            long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
            this.metrics.ifPresent(metrics -> metrics.updateMetrics("scan", baseFileOpenMs + logScannerOpenMs));
            return Pair.of(baseFileReader, logRecordScanner);
        }
        catch (IOException e) {
            throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
        }
    }

    private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
        Long baseFileOpenMs;
        HoodieSeekingFileReader baseFileReader;
        Option<HoodieBaseFile> basefile = slice.getBaseFile();
        if (basefile.isPresent()) {
            String baseFilePath = basefile.get().getPath();
            baseFileReader = (HoodieSeekingFileReader)HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(this.getHadoopConf(), new Path(baseFilePath));
            baseFileOpenMs = timer.endTimer();
            LOG.info((Object)String.format("Opened metadata base file from %s at instant %s in %d ms", baseFilePath, basefile.get().getCommitTime(), baseFileOpenMs));
        } else {
            baseFileReader = null;
            baseFileOpenMs = 0L;
            timer.endTimer();
        }
        return Pair.of(baseFileReader, baseFileOpenMs);
    }

    private Set<String> getValidInstantTimestamps() {
        HoodieActiveTimeline datasetTimeline = this.dataMetaClient.getActiveTimeline();
        Set<String> validInstantTimestamps = datasetTimeline.filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
        validInstantTimestamps.addAll(this.metadataMetaClient.getActiveTimeline().filter(instant -> instant.isCompleted() && HoodieTableMetadataUtil.isIndexingCommit(instant.getTimestamp())).getInstants().stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
        String earliestInstantTime = validInstantTimestamps.isEmpty() ? "00000000000000" : (String)Collections.min(validInstantTimestamps);
        datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, earliestInstantTime)).forEach(instant -> validInstantTimestamps.addAll(this.getRollbackedCommits((HoodieInstant)instant, datasetTimeline)));
        validInstantTimestamps.add("00000000000000");
        return validInstantTimestamps;
    }

    public Pair<HoodieMetadataLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles, String partitionName, Option<Boolean> allowFullScanOverride) {
        HoodieTimer timer = HoodieTimer.start();
        List<String> sortedLogFilePaths = logFiles.stream().sorted(HoodieLogFile.getLogFileComparator()).map(o -> o.getPath().toString()).collect(Collectors.toList());
        Set<String> validInstantTimestamps = this.getValidInstantTimestamps();
        Option<HoodieInstant> latestMetadataInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
        String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse("00000000000000");
        boolean allowFullScan = allowFullScanOverride.orElseGet(() -> this.isFullScanAllowedForPartition(partitionName));
        Schema schema2 = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
        HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(this.metadataConfig.getProps()).build();
        HoodieMetadataLogRecordReader logRecordScanner = HoodieMetadataLogRecordReader.newBuilder().withFileSystem(this.metadataMetaClient.getFs()).withBasePath(this.metadataBasePath).withLogFilePaths(sortedLogFilePaths).withReaderSchema(schema2).withLatestInstantTime(latestMetadataInstantTime).withMaxMemorySizeInBytes(0x40000000L).withBufferSize(10240).withSpillableMapBasePath(this.spillableMapDirectory).withDiskMapType(commonConfig.getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled()).withLogBlockTimestamps(validInstantTimestamps).enableFullScan(allowFullScan).withPartition(partitionName).withEnableOptimizedLogBlocksScan(this.metadataConfig.doEnableOptimizedLogBlocksScan()).build();
        Long logScannerOpenMs = timer.endTimer();
        LOG.info((Object)String.format("Opened %d metadata log files (dataset instant=%s, metadata instant=%s) in %d ms", sortedLogFilePaths.size(), this.getLatestDataInstantTime(), latestMetadataInstantTime, logScannerOpenMs));
        return Pair.of(logRecordScanner, logScannerOpenMs);
    }

    private boolean isFullScanAllowedForPartition(String partitionName) {
        switch (partitionName) {
            case "files": {
                return HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
            }
        }
        return false;
    }

    private List<String> getRollbackedCommits(HoodieInstant instant, HoodieActiveTimeline timeline) {
        try {
            List<String> commitsToRollback = null;
            if (instant.getAction().equals("rollback")) {
                try {
                    HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(instant).get());
                    commitsToRollback = rollbackMetadata.getCommitsRollback();
                }
                catch (IOException e) {
                    HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata(timeline.readRollbackInfoAsBytes(new HoodieInstant(HoodieInstant.State.REQUESTED, "rollback", instant.getTimestamp())).get(), HoodieRollbackPlan.class);
                    commitsToRollback = Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime());
                    LOG.warn((Object)("Had to fetch rollback info from requested instant since completed file is empty " + instant.toString()));
                }
                return commitsToRollback;
            }
            LinkedList<String> rollbackedCommits = new LinkedList<String>();
            if (instant.getAction().equals("restore")) {
                HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(timeline.getInstantDetails(instant).get());
                restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback())));
            }
            return rollbackedCommits;
        }
        catch (IOException e) {
            throw new HoodieMetadataException("Error retrieving rollback commits for instant " + instant, e);
        }
    }

    @Override
    public void close() {
        this.closePartitionReaders();
    }

    private synchronized void close(Pair<String, String> partitionFileSlicePair) {
        Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = this.partitionReaders.get().remove(partitionFileSlicePair);
        this.closeReader(readers);
    }

    private void closePartitionReaders() {
        for (Pair<String, String> partitionFileSlicePair : this.partitionReaders.get().keySet()) {
            this.close(partitionFileSlicePair);
        }
        this.partitionReaders.get().clear();
    }

    private void closeReader(Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers) {
        if (readers != null) {
            try {
                if (readers.getKey() != null) {
                    readers.getKey().close();
                }
                if (readers.getValue() != null) {
                    readers.getValue().close();
                }
            }
            catch (Exception e) {
                throw new HoodieException("Error closing resources during metadata table merge", e);
            }
        }
    }

    public boolean enabled() {
        return this.isMetadataTableEnabled;
    }

    public HoodieTableMetaClient getMetadataMetaClient() {
        return this.metadataMetaClient;
    }

    public Map<String, String> stats() {
        return this.metrics.map(m -> m.getStats(true, this.metadataMetaClient, (HoodieTableMetadata)this)).orElse(new HashMap());
    }

    @Override
    public Option<String> getSyncedInstantTime() {
        Option<HoodieInstant> latestInstant;
        if (this.metadataMetaClient != null && (latestInstant = this.metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()).isPresent()) {
            return Option.of(latestInstant.get().getTimestamp());
        }
        return Option.empty();
    }

    @Override
    public Option<String> getLatestCompactionTime() {
        Option<HoodieInstant> latestCompaction;
        if (this.metadataMetaClient != null && (latestCompaction = this.metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant()).isPresent()) {
            return Option.of(latestCompaction.get().getTimestamp());
        }
        return Option.empty();
    }

    @Override
    public void reset() {
        this.initIfNeeded();
        this.dataMetaClient.reloadActiveTimeline();
        if (this.metadataMetaClient != null) {
            this.metadataMetaClient.reloadActiveTimeline();
            this.metadataFileSystemView = HoodieTableMetadataUtil.getFileSystemView(this.metadataMetaClient);
        }
        this.closePartitionReaders();
    }
}

