/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.serialization.CustomSerializer;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecords;
import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieAbstractMergeHandle;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.SecondaryIndexStreamingTracker;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieMergeHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class HoodieWriteMergeHandle<T, I, K, O>
extends HoodieAbstractMergeHandle<T, I, K, O> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteMergeHandle.class);
    private final String[] orderingFields;
    protected Map<String, HoodieRecord<T>> keyToNewRecords;
    protected Set<String> writtenRecordKeys;
    protected HoodieFileWriter fileWriter;
    protected HoodieReaderContext<T> readerContext;
    protected long recordsWritten = 0L;
    protected long recordsDeleted = 0L;
    protected long updatedRecordsWritten = 0L;
    protected long insertRecordsWritten = 0L;

    public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
        this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, HoodieWriteMergeHandle.getLatestBaseFile(hoodieTable, partitionPath, fileId), keyGeneratorOpt);
    }

    public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
        super(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, baseFile, keyGeneratorOpt, false);
        this.populateIncomingRecordsMap(recordItr);
        this.initMarkerFileAndFileWriter(fileId, partitionPath);
        this.readerContext = hoodieTable.getReaderContextFactoryForWrite().getContext();
        this.orderingFields = ConfigUtils.getOrderingFields((Properties)config.getProps());
    }

    public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId, HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
        super(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, dataFileToBeMerged, keyGeneratorOpt, !HoodieTableMetadata.isMetadataTable((String)config.getBasePath()));
        this.keyToNewRecords = keyToNewRecords;
        this.initMarkerFileAndFileWriter(fileId, this.partitionPath);
        this.readerContext = hoodieTable.getReaderContextFactoryForWrite().getContext();
        this.orderingFields = ConfigUtils.getOrderingFields((Properties)config.getProps());
    }

    public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) {
        super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier, true);
        this.orderingFields = ConfigUtils.getOrderingFields((Properties)config.getProps());
    }

    @Override
    public void doMerge() throws IOException {
        HoodieMergeHelper.newInstance().runMerge(this.hoodieTable, this);
    }

    private void initMarkerFileAndFileWriter(String fileId, String partitionPath) {
        this.writtenRecordKeys = new HashSet<String>();
        try {
            this.createMarkerFile(partitionPath, this.newFilePath.getName());
            this.fileWriter = HoodieFileWriterFactory.getFileWriter((String)this.instantTime, (StoragePath)this.newFilePath, (HoodieStorage)this.hoodieTable.getStorage(), (HoodieConfig)this.config, (Schema)this.writeSchemaWithMetaFields, (TaskContextSupplier)this.taskContextSupplier, (HoodieRecord.HoodieRecordType)this.getRecordType());
        }
        catch (IOException io) {
            LOG.error("Error in update task at commit {}", (Object)this.instantTime, (Object)io);
            this.writeStatus.setGlobalError(io);
            throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " + this.instantTime + " on path " + this.hoodieTable.getMetaClient().getBasePath(), io);
        }
    }

    protected HoodieRecord.HoodieRecordType getRecordType() {
        return this.recordMerger.getRecordType();
    }

    protected void initIncomingRecordsMap() {
        try {
            long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(this.taskContextSupplier, this.config);
            LOG.info("MaxMemoryPerPartitionMerge => {}", (Object)memoryForMerge);
            this.keyToNewRecords = new ExternalSpillableMap(memoryForMerge, this.config.getSpillableMapBasePath(), (SizeEstimator)new DefaultSizeEstimator(), (SizeEstimator)new HoodieRecordSizeEstimator(this.writeSchema), this.config.getCommonConfig().getSpillableDiskMapType(), (CustomSerializer)new DefaultSerializer(), this.config.getCommonConfig().isBitCaskDiskMapCompressionEnabled(), this.getClass().getSimpleName());
        }
        catch (IOException io) {
            throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
        }
    }

    boolean needsUpdateLocation() {
        return true;
    }

    protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>> newRecordsItr) {
        this.initIncomingRecordsMap();
        while (newRecordsItr.hasNext()) {
            HoodieRecord<T> record = newRecordsItr.next();
            if (this.needsUpdateLocation()) {
                record.unseal();
                record.setNewLocation(this.newRecordLocation);
                record.seal();
            }
            this.keyToNewRecords.put(record.getRecordKey(), record);
        }
        if (this.keyToNewRecords instanceof ExternalSpillableMap) {
            ExternalSpillableMap spillableMap = (ExternalSpillableMap)this.keyToNewRecords;
            LOG.info("Number of entries in MemoryBasedMap => {}, Total size in bytes of MemoryBasedMap => {}, Number of entries in BitCaskDiskMap => {}, Size of file spilled to disk => {}", new Object[]{spillableMap.getInMemoryMapNumEntries(), spillableMap.getCurrentInMemoryMapSize(), spillableMap.getDiskBasedMapNumEntries(), spillableMap.getSizeOfFileOnDiskInBytes()});
        }
    }

    public boolean isEmptyNewRecords() {
        return this.keyToNewRecords.isEmpty();
    }

    protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, HoodieRecord combineRecord, Schema writerSchema) throws IOException {
        boolean isDelete = false;
        if (oldRecord.getData() != combineRecord.getData()) {
            isDelete = this.isDeleteRecord(combineRecord);
            if (!isDelete) {
                ++this.updatedRecordsWritten;
            }
        } else {
            return false;
        }
        return this.writeRecord(newRecord, oldRecord, combineRecord, writerSchema, (Properties)this.config.getPayloadConfig().getProps(), isDelete);
    }

    protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException {
        Schema schema = this.getNewSchema();
        if (newRecord.shouldIgnore(schema, (Properties)this.config.getProps())) {
            return;
        }
        this.writeInsertRecord(newRecord, schema, (Properties)this.config.getProps());
    }

    protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, Properties prop) throws IOException {
        if (this.writeRecord(newRecord, null, newRecord, schema, prop, this.isDeleteRecord(newRecord))) {
            ++this.insertRecordsWritten;
        }
    }

    protected boolean writeRecord(HoodieRecord<T> newRecord, HoodieRecord combineRecord, Schema schema, Properties prop) throws IOException {
        return this.writeRecord(newRecord, null, combineRecord, schema, prop, false);
    }

    private boolean isDeleteRecord(HoodieRecord<T> record) {
        HoodieOperation operation = record.getOperation();
        return HoodieOperation.isDelete((HoodieOperation)operation) || HoodieOperation.isUpdateBefore((HoodieOperation)operation);
    }

    private boolean writeRecord(HoodieRecord<T> newRecord, @Nullable HoodieRecord<T> oldRecord, HoodieRecord combineRecord, Schema schema, Properties props, boolean isDelete) {
        Option<Map<String, String>> recordMetadata = this.getRecordMetadata(newRecord, schema, props);
        if (!this.partitionPath.equals(newRecord.getPartitionPath())) {
            HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " + newRecord.getPartitionPath() + " but trying to insert into partition: " + this.partitionPath);
            this.writeStatus.markFailure(newRecord, (Throwable)((Object)failureEx), recordMetadata);
            return false;
        }
        try {
            if (!isDelete && !combineRecord.isDelete(this.deleteContext, (Properties)this.config.getProps())) {
                HoodieKey hoodieKey = newRecord.getKey();
                if (this.isSecondaryIndexStatsStreamingWritesEnabled) {
                    SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey, combineRecord, oldRecord, false, this.writeStatus, this.writeSchemaWithMetaFields, this::getNewSchema, this.secondaryIndexDefns, (Option<BaseKeyGenerator>)this.keyGeneratorOpt, this.config);
                }
                this.writeToFile(hoodieKey, combineRecord, schema, props, this.preserveMetadata);
                ++this.recordsWritten;
            } else {
                if (this.isSecondaryIndexStatsStreamingWritesEnabled) {
                    SecondaryIndexStreamingTracker.trackSecondaryIndexStats(newRecord.getKey(), combineRecord, oldRecord, true, this.writeStatus, this.writeSchemaWithMetaFields, this::getNewSchema, this.secondaryIndexDefns, (Option<BaseKeyGenerator>)this.keyGeneratorOpt, this.config);
                }
                ++this.recordsDeleted;
                newRecord.unseal();
                newRecord.clearNewLocation();
                newRecord.seal();
            }
            this.writeStatus.markSuccess(newRecord, recordMetadata);
            newRecord.deflate();
            return true;
        }
        catch (Exception e) {
            LOG.error("Error writing record {}", newRecord, (Object)e);
            this.writeStatus.markFailure(newRecord, e, recordMetadata);
            return false;
        }
    }

    public void write(HoodieRecord<T> oldRecord) {
        Schema oldSchema = this.writeSchemaWithMetaFields;
        Schema newSchema = this.getNewSchema();
        boolean copyOldRecord = true;
        String key = oldRecord.getRecordKey(oldSchema, this.keyGeneratorOpt);
        TypedProperties props = this.config.getPayloadConfig().getProps();
        if (this.keyToNewRecords.containsKey(key)) {
            HoodieRecord newRecord = this.keyToNewRecords.get(key).newInstance();
            try {
                BufferedRecord oldBufferedRecord = BufferedRecords.fromHoodieRecord(oldRecord, (Schema)oldSchema, (RecordContext)this.readerContext.getRecordContext(), (Properties)props, (String[])this.orderingFields, (boolean)false);
                BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord((HoodieRecord)newRecord, (Schema)newSchema, (RecordContext)this.readerContext.getRecordContext(), (Properties)props, (String[])this.orderingFields, (DeleteContext)this.deleteContext);
                BufferedRecord mergeResult = this.recordMerger.merge(oldBufferedRecord, newBufferedRecord, this.readerContext.getRecordContext(), props);
                Schema combineRecordSchema = this.readerContext.getRecordContext().getSchemaFromBufferRecord(mergeResult);
                HoodieRecord combinedRecord = this.readerContext.getRecordContext().constructHoodieRecord(mergeResult);
                if (combinedRecord.shouldIgnore(combineRecordSchema, (Properties)props)) {
                    copyOldRecord = true;
                } else if (this.writeUpdateRecord(newRecord, oldRecord, combinedRecord, combineRecordSchema)) {
                    copyOldRecord = false;
                }
                this.writtenRecordKeys.add(key);
            }
            catch (Exception e) {
                throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {" + this.keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
            }
        }
        if (copyOldRecord) {
            try {
                this.writeToFile(new HoodieKey(key, this.partitionPath), oldRecord, oldSchema, (Properties)props, true);
            }
            catch (IOException | RuntimeException e) {
                String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s", key, this.getOldFilePath(), this.newFilePath, this.writeSchemaWithMetaFields.toString(true));
                LOG.debug("Old record is {}", oldRecord);
                throw new HoodieUpsertException(errMsg, e);
            }
            ++this.recordsWritten;
        }
    }

    protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema schema, Properties props, boolean shouldPreserveRecordMetadata) throws IOException {
        if (shouldPreserveRecordMetadata) {
            HoodieRecord populatedRecord = record.updateMetaField(schema, HoodieRecord.FILENAME_META_FIELD_ORD, this.newFilePath.getName());
            this.fileWriter.write(key.getRecordKey(), populatedRecord, this.writeSchemaWithMetaFields, props);
        } else {
            record = record.prependMetaFields(schema, this.writeSchemaWithMetaFields, new MetadataValues(), (Properties)this.config.getProps());
            this.fileWriter.writeWithMetadata(key, record, this.writeSchemaWithMetaFields, props);
        }
    }

    protected void writeIncomingRecords() throws IOException {
        Iterator newRecordsItr = this.keyToNewRecords instanceof ExternalSpillableMap ? ((ExternalSpillableMap)this.keyToNewRecords).iterator(key -> !this.writtenRecordKeys.contains(key)) : this.keyToNewRecords.entrySet().stream().filter(e -> !this.writtenRecordKeys.contains(e.getKey())).map(Map.Entry::getValue).iterator();
        while (newRecordsItr.hasNext()) {
            HoodieRecord hoodieRecord = (HoodieRecord)newRecordsItr.next();
            this.writeInsertRecord(hoodieRecord);
        }
    }

    private Schema getNewSchema() {
        return this.preserveMetadata ? this.writeSchemaWithMetaFields : this.writeSchema;
    }

    @Override
    public List<WriteStatus> close() {
        try {
            if (this.isClosed()) {
                return Collections.singletonList(this.writeStatus);
            }
            this.markClosed();
            this.writeIncomingRecords();
            if (this.keyToNewRecords instanceof Closeable) {
                ((Closeable)((Object)this.keyToNewRecords)).close();
            }
            this.keyToNewRecords = null;
            this.writtenRecordKeys = null;
            this.fileWriter.close();
            this.fileWriter = null;
            long fileSizeInBytes = this.storage.getPathInfo(this.newFilePath).getLength();
            HoodieWriteStat stat = this.writeStatus.getStat();
            stat.setTotalWriteBytes(fileSizeInBytes);
            stat.setFileSizeInBytes(fileSizeInBytes);
            stat.setNumWrites(this.recordsWritten);
            stat.setNumDeletes(this.recordsDeleted);
            stat.setNumUpdateWrites(this.updatedRecordsWritten);
            stat.setNumInserts(this.insertRecordsWritten);
            stat.setTotalWriteErrors(this.writeStatus.getTotalErrorRecords());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalUpsertTime(this.timer.endTimer());
            stat.setRuntimeStats(runtimeStats);
            this.performMergeDataValidationCheck(this.writeStatus);
            LOG.info("MergeHandle for partitionPath {} fileID {}, took {} ms.", new Object[]{stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime()});
            return Collections.singletonList(this.writeStatus);
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to close UpdateHandle", e);
        }
    }

    public void performMergeDataValidationCheck(WriteStatus writeStatus) {
        if (!this.config.isMergeDataValidationCheckEnabled() || this.baseFileToMerge == null) {
            return;
        }
        long oldNumWrites = 0L;
        try (HoodieFileReader reader = HoodieIOFactory.getIOFactory((HoodieStorage)this.hoodieTable.getStorage()).getReaderFactory(this.recordMerger.getRecordType()).getFileReader((HoodieConfig)this.config, this.oldFilePath);){
            oldNumWrites = reader.getTotalRecords();
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to check for merge data validation", e);
        }
        if (writeStatus.getStat().getNumWrites() + writeStatus.getStat().getNumDeletes() < oldNumWrites) {
            throw new HoodieCorruptedDataException(String.format("Record write count decreased for file: %s, Partition Path: %s (%s:%d + %d < %s:%d)", writeStatus.getFileId(), writeStatus.getPartitionPath(), this.instantTime, writeStatus.getStat().getNumWrites(), writeStatus.getStat().getNumDeletes(), this.baseFileToMerge.getCommitTime(), oldNumWrites));
        }
    }

    public void setReaderContext(HoodieReaderContext<T> readerContext) {
        this.readerContext = readerContext;
    }
}

