/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@NotThreadSafe
public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieWriteHandle<T, I, K, O> {
    private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
    protected Map<String, HoodieRecord<T>> keyToNewRecords;
    protected Set<String> writtenRecordKeys;
    protected HoodieFileWriter<IndexedRecord> fileWriter;
    private boolean preserveMetadata = false;
    protected Path newFilePath;
    protected Path oldFilePath;
    protected long recordsWritten = 0L;
    protected long recordsDeleted = 0L;
    protected long updatedRecordsWritten = 0L;
    protected long insertRecordsWritten = 0L;
    protected boolean useWriterSchemaForCompaction;
    protected Option<BaseKeyGenerator> keyGeneratorOpt;
    private HoodieBaseFile baseFileToMerge;

    public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
        this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, HoodieMergeHandle.getLatestBaseFile(hoodieTable, partitionPath, fileId), keyGeneratorOpt);
    }

    public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
        super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
        this.init(fileId, recordItr);
        this.init(fileId, partitionPath, baseFile);
        this.validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
    }

    public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId, HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
        super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
        this.keyToNewRecords = keyToNewRecords;
        this.useWriterSchemaForCompaction = true;
        this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
        this.init(fileId, this.partitionPath, dataFileToBeMerged);
        this.validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
    }

    private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> keyGeneratorOpt, boolean populateMetaFields) {
        ValidationUtils.checkArgument((populateMetaFields == !keyGeneratorOpt.isPresent() ? 1 : 0) != 0);
        this.keyGeneratorOpt = keyGeneratorOpt;
    }

    public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?> hoodieTable, String partitionPath, String fileId) {
        Option baseFileOp = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
        if (!baseFileOp.isPresent()) {
            throw new NoSuchElementException(String.format("FileID %s of partition path %s does not exist.", fileId, partitionPath));
        }
        return (HoodieBaseFile)baseFileOp.get();
    }

    @Override
    public Schema getWriterSchemaWithMetaFields() {
        return this.writeSchemaWithMetaFields;
    }

    public Schema getWriterSchema() {
        return this.writeSchema;
    }

    private void init(String fileId, String partitionPath, HoodieBaseFile baseFileToMerge) {
        LOG.info((Object)("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId));
        this.baseFileToMerge = baseFileToMerge;
        this.writtenRecordKeys = new HashSet<String>();
        this.writeStatus.setStat(new HoodieWriteStat());
        try {
            String latestValidFilePath = baseFileToMerge.getFileName();
            this.writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime((String)latestValidFilePath));
            HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(this.fs, this.instantTime, new Path(this.config.getBasePath()), FSUtils.getPartitionPath((String)this.config.getBasePath(), (String)partitionPath), this.hoodieTable.getPartitionMetafileFormat());
            partitionMetadata.trySave(this.getPartitionId());
            String newFileName = FSUtils.makeBaseFileName((String)this.instantTime, (String)this.writeToken, (String)fileId, (String)this.hoodieTable.getBaseFileExtension());
            this.makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);
            LOG.info((Object)String.format("Merging new data into oldPath %s, as newPath %s", this.oldFilePath.toString(), this.newFilePath.toString()));
            this.writeStatus.setFileId(fileId);
            this.writeStatus.setPartitionPath(partitionPath);
            this.writeStatus.getStat().setPartitionPath(partitionPath);
            this.writeStatus.getStat().setFileId(fileId);
            this.setWriteStatusPath();
            this.createMarkerFile(partitionPath, this.newFilePath.getName());
            this.fileWriter = this.createNewFileWriter(this.instantTime, this.newFilePath, this.hoodieTable, this.config, this.writeSchemaWithMetaFields, this.taskContextSupplier);
        }
        catch (IOException io) {
            LOG.error((Object)("Error in update task at commit " + this.instantTime), (Throwable)io);
            this.writeStatus.setGlobalError(io);
            throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " + this.instantTime + " on path " + this.hoodieTable.getMetaClient().getBasePath(), io);
        }
    }

    protected void setWriteStatusPath() {
        this.writeStatus.getStat().setPath(new Path(this.config.getBasePath()), this.newFilePath);
    }

    protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
        this.oldFilePath = this.makeNewFilePath(partitionPath, oldFileName);
        this.newFilePath = this.makeNewFilePath(partitionPath, newFileName);
    }

    protected void initializeIncomingRecordsMap() {
        try {
            long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(this.taskContextSupplier, this.config);
            LOG.info((Object)("MaxMemoryPerPartitionMerge => " + memoryForMerge));
            this.keyToNewRecords = new ExternalSpillableMap(Long.valueOf(memoryForMerge), this.config.getSpillableMapBasePath(), (SizeEstimator)new DefaultSizeEstimator(), (SizeEstimator)new HoodieRecordSizeEstimator(this.tableSchema), this.config.getCommonConfig().getSpillableDiskMapType(), this.config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
        }
        catch (IOException io) {
            throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
        }
    }

    boolean needsUpdateLocation() {
        return true;
    }

    protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
        this.initializeIncomingRecordsMap();
        while (newRecordsItr.hasNext()) {
            HoodieRecord<T> record = newRecordsItr.next();
            if (this.needsUpdateLocation()) {
                record.unseal();
                record.setNewLocation(new HoodieRecordLocation(this.instantTime, fileId));
                record.seal();
            }
            this.keyToNewRecords.put(record.getRecordKey(), record);
        }
        LOG.info((Object)("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap)this.keyToNewRecords).getInMemoryMapNumEntries() + ", Total size in bytes of MemoryBasedMap => " + ((ExternalSpillableMap)this.keyToNewRecords).getCurrentInMemoryMapSize() + ", Number of entries in BitCaskDiskMap => " + ((ExternalSpillableMap)this.keyToNewRecords).getDiskBasedMapNumEntries() + ", Size of file spilled to disk => " + ((ExternalSpillableMap)this.keyToNewRecords).getSizeOfFileOnDiskInBytes()));
    }

    private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
        boolean isDelete = false;
        if (indexedRecord.isPresent()) {
            ++this.updatedRecordsWritten;
            GenericRecord record = (GenericRecord)indexedRecord.get();
            if (oldRecord != record) {
                isDelete = HoodieOperation.isDelete((HoodieOperation)hoodieRecord.getOperation());
            } else {
                return false;
            }
        }
        return this.writeRecord(hoodieRecord, indexedRecord, isDelete);
    }

    protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
        Schema schema = this.useWriterSchemaForCompaction ? this.tableSchemaWithMetaFields : this.tableSchema;
        Option insertRecord = ((HoodieRecordPayload)hoodieRecord.getData()).getInsertValue(schema, (Properties)this.config.getProps());
        if (insertRecord.isPresent() && ((IndexedRecord)insertRecord.get()).equals(IGNORE_RECORD)) {
            return;
        }
        if (this.writeRecord(hoodieRecord, (Option<IndexedRecord>)insertRecord, HoodieOperation.isDelete((HoodieOperation)hoodieRecord.getOperation()))) {
            ++this.insertRecordsWritten;
        }
    }

    protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
        return this.writeRecord(hoodieRecord, indexedRecord, false);
    }

    protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
        Option recordMetadata = ((HoodieRecordPayload)hoodieRecord.getData()).getMetadata();
        if (!this.partitionPath.equals(hoodieRecord.getPartitionPath())) {
            HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " + hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + this.partitionPath);
            this.writeStatus.markFailure(hoodieRecord, (Throwable)((Object)failureEx), (Option<Map<String, String>>)recordMetadata);
            return false;
        }
        try {
            if (indexedRecord.isPresent() && !isDelete) {
                this.writeToFile(hoodieRecord.getKey(), (GenericRecord)indexedRecord.get(), this.preserveMetadata && this.useWriterSchemaForCompaction);
                ++this.recordsWritten;
            } else {
                ++this.recordsDeleted;
            }
            this.writeStatus.markSuccess(hoodieRecord, (Option<Map<String, String>>)recordMetadata);
            hoodieRecord.deflate();
            return true;
        }
        catch (Exception e) {
            LOG.error((Object)("Error writing record  " + hoodieRecord), (Throwable)e);
            this.writeStatus.markFailure(hoodieRecord, e, (Option<Map<String, String>>)recordMetadata);
            return false;
        }
    }

    public void write(GenericRecord oldRecord) {
        String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, this.keyGeneratorOpt);
        boolean copyOldRecord = true;
        if (this.keyToNewRecords.containsKey(key)) {
            HoodieRecord hoodieRecord = this.keyToNewRecords.get(key).newInstance();
            try {
                Option combinedAvroRecord = ((HoodieRecordPayload)hoodieRecord.getData()).combineAndGetUpdateValue((IndexedRecord)oldRecord, this.useWriterSchemaForCompaction ? this.tableSchemaWithMetaFields : this.tableSchema, (Properties)this.config.getPayloadConfig().getProps());
                if (combinedAvroRecord.isPresent() && ((IndexedRecord)combinedAvroRecord.get()).equals(IGNORE_RECORD)) {
                    copyOldRecord = true;
                } else if (this.writeUpdateRecord(hoodieRecord, oldRecord, (Option<IndexedRecord>)combinedAvroRecord)) {
                    copyOldRecord = false;
                }
                this.writtenRecordKeys.add(key);
            }
            catch (Exception e) {
                throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {" + this.keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
            }
        }
        if (copyOldRecord) {
            try {
                this.writeToFile(new HoodieKey(key, this.partitionPath), oldRecord, true);
            }
            catch (IOException | RuntimeException e) {
                String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s", key, this.getOldFilePath(), this.newFilePath, this.writeSchemaWithMetaFields.toString(true));
                LOG.debug((Object)("Old record is " + oldRecord));
                throw new HoodieUpsertException(errMsg, e);
            }
            ++this.recordsWritten;
        }
    }

    protected void writeToFile(HoodieKey key, GenericRecord avroRecord, boolean shouldPreserveRecordMetadata) throws IOException {
        if (shouldPreserveRecordMetadata) {
            this.fileWriter.writeAvro(key.getRecordKey(), (IndexedRecord)this.rewriteRecordWithMetadata(avroRecord, this.newFilePath.getName()));
        } else {
            this.fileWriter.writeAvroWithMetadata(key, (IndexedRecord)this.rewriteRecord(avroRecord));
        }
    }

    protected void writeIncomingRecords() throws IOException {
        Iterator<HoodieRecord<T>> newRecordsItr;
        Iterator<HoodieRecord<T>> iterator = newRecordsItr = this.keyToNewRecords instanceof ExternalSpillableMap ? ((ExternalSpillableMap)this.keyToNewRecords).iterator() : this.keyToNewRecords.values().iterator();
        while (newRecordsItr.hasNext()) {
            HoodieRecord<T> hoodieRecord = newRecordsItr.next();
            if (this.writtenRecordKeys.contains(hoodieRecord.getRecordKey())) continue;
            this.writeInsertRecord(hoodieRecord);
        }
    }

    @Override
    public List<WriteStatus> close() {
        try {
            this.writeIncomingRecords();
            if (this.keyToNewRecords instanceof ExternalSpillableMap) {
                ((ExternalSpillableMap)this.keyToNewRecords).close();
            } else {
                this.keyToNewRecords.clear();
            }
            this.writtenRecordKeys.clear();
            if (this.fileWriter != null) {
                this.fileWriter.close();
                this.fileWriter = null;
            }
            long fileSizeInBytes = FSUtils.getFileSize((FileSystem)this.fs, (Path)this.newFilePath);
            HoodieWriteStat stat = this.writeStatus.getStat();
            stat.setTotalWriteBytes(fileSizeInBytes);
            stat.setFileSizeInBytes(fileSizeInBytes);
            stat.setNumWrites(this.recordsWritten);
            stat.setNumDeletes(this.recordsDeleted);
            stat.setNumUpdateWrites(this.updatedRecordsWritten);
            stat.setNumInserts(this.insertRecordsWritten);
            stat.setTotalWriteErrors(this.writeStatus.getTotalErrorRecords());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalUpsertTime(this.timer.endTimer());
            stat.setRuntimeStats(runtimeStats);
            this.performMergeDataValidationCheck(this.writeStatus);
            LOG.info((Object)String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime()));
            return Collections.singletonList(this.writeStatus);
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to close UpdateHandle", e);
        }
    }

    public void performMergeDataValidationCheck(WriteStatus writeStatus) {
        if (!this.config.isMergeDataValidationCheckEnabled()) {
            return;
        }
        long oldNumWrites = 0L;
        try (HoodieFileReader reader = HoodieFileReaderFactory.getFileReader((Configuration)this.hoodieTable.getHadoopConf(), (Path)this.oldFilePath);){
            oldNumWrites = reader.getTotalRecords();
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to check for merge data validation", e);
        }
        if (writeStatus.getStat().getNumWrites() + writeStatus.getStat().getNumDeletes() < oldNumWrites) {
            throw new HoodieCorruptedDataException(String.format("Record write count decreased for file: %s, Partition Path: %s (%s:%d + %d < %s:%d)", writeStatus.getFileId(), writeStatus.getPartitionPath(), this.instantTime, writeStatus.getStat().getNumWrites(), writeStatus.getStat().getNumDeletes(), FSUtils.getCommitTime((String)this.oldFilePath.toString()), oldNumWrites));
        }
    }

    public Path getOldFilePath() {
        return this.oldFilePath;
    }

    @Override
    public IOType getIOType() {
        return IOType.MERGE;
    }

    public HoodieBaseFile baseFileForMerge() {
        return this.baseFileToMerge;
    }
}

