/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimePath;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieMergeOnReadSnapshotReader
extends AbstractRealtimeRecordReader
implements Iterator<HoodieRecord>,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeOnReadSnapshotReader.class);
    private final String tableBasePath;
    private final List<HoodieLogFile> logFilePaths;
    private final String latestInstantTime;
    private final Schema readerSchema;
    private final JobConf jobConf;
    private final HoodieMergedLogRecordScanner logRecordScanner;
    private final HoodieFileReader baseFileReader;
    private final Map<String, HoodieRecord> logRecordsByKey;
    private final Iterator<HoodieRecord> recordsIterator;
    private final ExternalSpillableMap<String, HoodieRecord> mergedRecordsByKey;

    public HoodieMergeOnReadSnapshotReader(String tableBasePath, String baseFilePath, List<HoodieLogFile> logFilePaths, String latestInstantTime, Schema readerSchema, JobConf jobConf, long start2, long length) throws IOException {
        super(HoodieMergeOnReadSnapshotReader.getRealtimeSplit(tableBasePath, baseFilePath, logFilePaths, latestInstantTime, start2, length, new String[0]), jobConf);
        this.tableBasePath = tableBasePath;
        this.logFilePaths = logFilePaths;
        this.latestInstantTime = latestInstantTime;
        this.readerSchema = readerSchema;
        this.jobConf = jobConf;
        HoodieTimer timer = new HoodieTimer().startTimer();
        this.logRecordScanner = this.getMergedLogRecordScanner();
        LOG.debug("Time taken to scan log records: {}", (Object)timer.endTimer());
        this.baseFileReader = HoodieRealtimeRecordReaderUtils.getBaseFileReader(new Path(baseFilePath), jobConf);
        this.logRecordsByKey = this.logRecordScanner.getRecords();
        HashSet<String> logRecordKeys = new HashSet<String>(this.logRecordsByKey.keySet());
        this.mergedRecordsByKey = new ExternalSpillableMap(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), jobConf.get("hoodie.memory.spillable.map.path", "/tmp/"), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(readerSchema), (ExternalSpillableMap.DiskMapType)jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), (Enum)HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()), jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().booleanValue()));
        try (ClosableIterator<String> baseFileIterator = this.baseFileReader.getRecordKeyIterator();){
            timer.startTimer();
            while (baseFileIterator.hasNext()) {
                String key = (String)baseFileIterator.next();
                if (!logRecordKeys.contains(key)) continue;
                logRecordKeys.remove(key);
                Option<HoodieAvroIndexedRecord> mergedRecord = this.buildGenericRecordWithCustomPayload(this.logRecordsByKey.get(key));
                if (!mergedRecord.isPresent()) continue;
                HoodieRecord<IndexedRecord> hoodieRecord = mergedRecord.get().copy();
                this.mergedRecordsByKey.put(key, hoodieRecord);
            }
        }
        LOG.debug("Time taken to merge base file and log file records: {}", (Object)timer.endTimer());
        this.recordsIterator = this.mergedRecordsByKey.values().iterator();
    }

    @Override
    public boolean hasNext() {
        return this.recordsIterator.hasNext();
    }

    @Override
    public HoodieRecord next() {
        return this.recordsIterator.next();
    }

    public Map<String, HoodieRecord> getRecordsByKey() {
        return this.mergedRecordsByKey;
    }

    public Iterator<HoodieRecord> getRecordsIterator() {
        return this.recordsIterator;
    }

    public Map<String, HoodieRecord> getLogRecordsByKey() {
        return this.logRecordsByKey;
    }

    private static HoodieRealtimeFileSplit getRealtimeSplit(String tableBasePath, String baseFilePath, List<HoodieLogFile> logFilePaths, String latestInstantTime, long start2, long length, String[] hosts) {
        HoodieRealtimePath realtimePath = new HoodieRealtimePath(new Path(baseFilePath).getParent(), baseFilePath, tableBasePath, logFilePaths, latestInstantTime, false, Option.empty());
        return HoodieInputFormatUtils.createRealtimeFileSplit(realtimePath, start2, length, hosts);
    }

    private HoodieMergedLogRecordScanner getMergedLogRecordScanner() {
        return ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(this.split.getPath().toString(), HadoopFSUtils.getStorageConf((Configuration)this.jobConf))).withBasePath(this.tableBasePath).withLogFilePaths(this.logFilePaths.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))).withReaderSchema(this.readerSchema).withLatestInstantTime(this.latestInstantTime).withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(this.jobConf)).withReverseReader(false).withBufferSize(this.jobConf.getInt("hoodie.memory.dfs.buffer.max.size", 0x100000)).withSpillableMapBasePath(this.jobConf.get("hoodie.memory.spillable.map.path", "/tmp/")).withDiskMapType((ExternalSpillableMap.DiskMapType)this.jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), (Enum)HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())).withBitCaskDiskMapCompressionEnabled(this.jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().booleanValue())).withOptimizedLogBlocksScan(this.jobConf.getBoolean("hoodie.optimized.log.blocks.scan.enable", false)).withInternalSchema(this.schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())).build();
    }

    private Option<HoodieAvroIndexedRecord> buildGenericRecordWithCustomPayload(HoodieRecord record) throws IOException {
        if (this.usesCustomPayload) {
            return record.toIndexedRecord(this.getWriterSchema(), this.payloadProps);
        }
        return record.toIndexedRecord(this.readerSchema, this.payloadProps);
    }

    @Override
    public void close() throws Exception {
        if (this.baseFileReader != null) {
            this.baseFileReader.close();
        }
        if (this.logRecordScanner != null) {
            this.logRecordScanner.close();
        }
    }
}

