/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.log;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class HoodieMergedLogRecordScanner
extends AbstractHoodieLogRecordScanner
implements Iterable<HoodieRecord>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class);
    public final HoodieTimer timer = HoodieTimer.create();
    private final ExternalSpillableMap<String, HoodieRecord> records;
    private final Set<String> scannedPrefixes;
    private long numMergedRecordsInLog;
    private final long maxMemorySizeInBytes;
    private long totalTimeTakenToReadAndMergeBlocks;

    protected HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, List<String> logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, boolean reverseReader, int bufferSize, String spillableMapBasePath, Option<InstantRange> instantRange, ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, boolean withOperationField, boolean forceFullScan, Option<String> partitionName, InternalSchema internalSchema, Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option<HoodieTableMetaClient> hoodieTableMetaClientOption, boolean allowInflightInstants) {
        super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption);
        try {
            this.maxMemorySizeInBytes = maxMemorySizeInBytes;
            this.records = new ExternalSpillableMap(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(readerSchema), diskMapType, new DefaultSerializer(), isBitCaskDiskMapCompressionEnabled, this.getClass().getSimpleName());
            this.scannedPrefixes = new HashSet<String>();
            this.allowInflightInstants = allowInflightInstants;
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
        }
        if (forceFullScan) {
            this.performScan();
        }
    }

    public final void scan() {
        this.scan(false);
    }

    public final void scan(boolean skipProcessingBlocks) {
        if (this.forceFullScan) {
            return;
        }
        this.scanInternal(Option.empty(), skipProcessingBlocks);
    }

    public void scanByFullKeys(List<String> keys2) {
        if (this.forceFullScan) {
            return;
        }
        List<String> missingKeys = keys2.stream().filter(key -> !this.records.containsKey(key)).collect(Collectors.toList());
        if (missingKeys.isEmpty()) {
            return;
        }
        this.scanInternal(Option.of(AbstractHoodieLogRecordScanner.KeySpec.fullKeySpec(missingKeys)), false);
    }

    public void scanByKeyPrefixes(List<String> keyPrefixes) {
        if (this.forceFullScan) {
            return;
        }
        List<String> missingKeyPrefixes = keyPrefixes.stream().filter(keyPrefix -> this.scannedPrefixes.stream().noneMatch(keyPrefix::startsWith)).collect(Collectors.toList());
        if (missingKeyPrefixes.isEmpty()) {
            return;
        }
        this.scanInternal(Option.of(AbstractHoodieLogRecordScanner.KeySpec.prefixKeySpec(missingKeyPrefixes)), false);
        this.scannedPrefixes.addAll(missingKeyPrefixes);
    }

    private void performScan() {
        this.timer.startTimer();
        this.scanInternal(Option.empty(), false);
        this.totalTimeTakenToReadAndMergeBlocks = this.timer.endTimer();
        this.numMergedRecordsInLog = this.records.size();
        if (LOG.isInfoEnabled()) {
            LOG.info("Scanned {} log files with stats: MaxMemoryInBytes => {}, MemoryBasedMap => {} entries, {} total bytes, DiskBasedMap => {} entries, {} total bytes", new Object[]{this.logFilePaths.size(), this.maxMemorySizeInBytes, this.records.getInMemoryMapNumEntries(), this.records.getCurrentInMemoryMapSize(), this.records.getDiskBasedMapNumEntries(), this.records.getSizeOfFileOnDiskInBytes()});
        }
    }

    @Override
    public Iterator<HoodieRecord> iterator() {
        return this.records.iterator();
    }

    public Map<String, HoodieRecord> getRecords() {
        return this.records;
    }

    public HoodieRecord.HoodieRecordType getRecordType() {
        return this.recordMerger.getRecordType();
    }

    public long getNumMergedRecordsInLog() {
        return this.numMergedRecordsInLog;
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    @Override
    protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException {
        String key = newRecord.getRecordKey();
        HoodieRecord prevRecord = this.records.get(key);
        if (prevRecord != null) {
            HoodieRecord combinedRecord = this.recordMerger.merge(prevRecord, this.readerSchema, newRecord, this.readerSchema, this.getPayloadProps()).get().getLeft();
            if (combinedRecord.getData() != prevRecord.getData()) {
                HoodieRecord latestHoodieRecord = HoodieMergedLogRecordScanner.getLatestHoodieRecord(newRecord, combinedRecord, key);
                this.records.put(key, latestHoodieRecord.copy());
            }
        } else {
            this.records.put(key, newRecord.copy());
        }
    }

    @Override
    protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
        String key = deleteRecord.getRecordKey();
        HoodieRecord oldRecord = this.records.get(key);
        if (oldRecord != null) {
            boolean choosePrev;
            Comparable<?> curOrderingVal = oldRecord.getOrderingValue(this.readerSchema, this.hoodieTableMetaClient.getTableConfig().getProps());
            Comparable<?> deleteOrderingVal = deleteRecord.getOrderingValue();
            boolean bl = choosePrev = !deleteOrderingVal.equals(0) && ReflectionUtils.isSameClass(curOrderingVal, deleteOrderingVal) && curOrderingVal.compareTo(deleteOrderingVal) > 0;
            if (choosePrev) {
                return;
            }
        }
        if (this.recordType == HoodieRecord.HoodieRecordType.AVRO) {
            this.records.put(key, (HoodieRecord)SpillableMapUtils.generateEmptyPayload(key, deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(), this.getPayloadClassFQN()));
        } else {
            HoodieEmptyRecord record = new HoodieEmptyRecord(new HoodieKey(key, deleteRecord.getPartitionPath()), null, deleteRecord.getOrderingValue(), this.recordType);
            this.records.put(key, record);
        }
    }

    public long getTotalTimeTakenToReadAndMergeBlocks() {
        return this.totalTimeTakenToReadAndMergeBlocks;
    }

    @Override
    public void close() {
        if (this.records != null) {
            this.records.close();
        }
    }

    private static <T> HoodieRecord getLatestHoodieRecord(HoodieRecord<T> newRecord, HoodieRecord<T> combinedRecord, String key) {
        HoodieRecord<T> latestHoodieRecord = combinedRecord.newInstance(new HoodieKey(key, newRecord.getPartitionPath()), newRecord.getOperation());
        latestHoodieRecord.unseal();
        latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
        latestHoodieRecord.seal();
        return latestHoodieRecord;
    }

    public static class Builder
    extends AbstractHoodieLogRecordScanner.Builder {
        private HoodieStorage storage;
        private String basePath;
        private List<String> logFilePaths;
        private Schema readerSchema;
        private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema();
        private String latestInstantTime;
        private boolean reverseReader;
        private int bufferSize;
        private Long maxMemorySizeInBytes;
        private String spillableMapBasePath;
        private ExternalSpillableMap.DiskMapType diskMapType = HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
        private boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
        private Option<InstantRange> instantRange = Option.empty();
        private String partitionName;
        private boolean withOperationField = false;
        private String keyFieldOverride;
        private boolean forceFullScan = true;
        private boolean enableOptimizedLogBlocksScan = false;
        protected boolean allowInflightInstants = false;
        private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE;
        protected HoodieTableMetaClient hoodieTableMetaClient;

        @Override
        public Builder withStorage(HoodieStorage storage) {
            this.storage = storage;
            return this;
        }

        @Override
        public Builder withBasePath(String basePath) {
            this.basePath = basePath;
            return this;
        }

        @Override
        public Builder withBasePath(StoragePath basePath) {
            this.basePath = basePath.toString();
            return this;
        }

        @Override
        public Builder withLogFilePaths(List<String> logFilePaths) {
            this.logFilePaths = logFilePaths.stream().filter(p -> !p.endsWith(".cdc")).collect(Collectors.toList());
            return this;
        }

        @Override
        public Builder withReaderSchema(Schema schema) {
            this.readerSchema = schema;
            return this;
        }

        @Override
        public Builder withLatestInstantTime(String latestInstantTime) {
            this.latestInstantTime = latestInstantTime;
            return this;
        }

        @Override
        public Builder withReverseReader(boolean reverseReader) {
            this.reverseReader = reverseReader;
            return this;
        }

        @Override
        public Builder withBufferSize(int bufferSize) {
            this.bufferSize = bufferSize;
            return this;
        }

        @Override
        public Builder withInstantRange(Option<InstantRange> instantRange) {
            this.instantRange = instantRange;
            return this;
        }

        @Override
        public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
            this.maxMemorySizeInBytes = maxMemorySizeInBytes;
            return this;
        }

        @Override
        public Builder withSpillableMapBasePath(String spillableMapBasePath) {
            this.spillableMapBasePath = spillableMapBasePath;
            return this;
        }

        @Override
        public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
            this.diskMapType = diskMapType;
            return this;
        }

        @Override
        public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) {
            this.isBitCaskDiskMapCompressionEnabled = isBitCaskDiskMapCompressionEnabled;
            return this;
        }

        @Override
        public Builder withInternalSchema(InternalSchema internalSchema) {
            this.internalSchema = internalSchema;
            return this;
        }

        @Override
        public Builder withOperationField(boolean withOperationField) {
            this.withOperationField = withOperationField;
            return this;
        }

        @Override
        public Builder withPartition(String partitionName) {
            this.partitionName = partitionName;
            return this;
        }

        @Override
        public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
            this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
            return this;
        }

        @Override
        public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
            this.recordMerger = HoodieRecordUtils.mergerToPreCombineMode(recordMerger);
            return this;
        }

        @Override
        public Builder withKeyFieldOverride(String keyFieldOverride) {
            this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
            return this;
        }

        @Override
        public Builder withForceFullScan(boolean forceFullScan) {
            this.forceFullScan = forceFullScan;
            return this;
        }

        @Override
        public Builder withTableMetaClient(HoodieTableMetaClient hoodieTableMetaClient) {
            this.hoodieTableMetaClient = hoodieTableMetaClient;
            return this;
        }

        public Builder withAllowInflightInstants(boolean allowInflightInstants) {
            this.allowInflightInstants = allowInflightInstants;
            return this;
        }

        @Override
        public HoodieMergedLogRecordScanner build() {
            if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFilePaths)) {
                this.partitionName = FSUtils.getRelativePartitionPath(new StoragePath(this.basePath), new StoragePath(this.logFilePaths.get(0)).getParent());
            }
            ValidationUtils.checkArgument(this.recordMerger != null);
            return new HoodieMergedLogRecordScanner(this.storage, this.basePath, this.logFilePaths, this.readerSchema, this.latestInstantTime, this.maxMemorySizeInBytes, this.reverseReader, this.bufferSize, this.spillableMapBasePath, this.instantRange, this.diskMapType, this.isBitCaskDiskMapCompressionEnabled, this.withOperationField, this.forceFullScan, Option.ofNullable(this.partitionName), this.internalSchema, Option.ofNullable(this.keyFieldOverride), this.enableOptimizedLogBlocksScan, this.recordMerger, Option.ofNullable(this.hoodieTableMetaClient), this.allowInflightInstants);
        }
    }
}

