/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.HoodieAvroUtils;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.storage.HoodieStorageWriter;
import org.apache.hudi.io.storage.HoodieStorageWriterFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;

public class HoodieMergeHandle<T extends HoodieRecordPayload>
extends HoodieWriteHandle<T> {
    private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
    private Map<String, HoodieRecord<T>> keyToNewRecords;
    private Set<String> writtenRecordKeys;
    private HoodieStorageWriter<IndexedRecord> storageWriter;
    private Path newFilePath;
    private Path oldFilePath;
    private long recordsWritten = 0L;
    private long recordsDeleted = 0L;
    private long updatedRecordsWritten = 0L;
    private long insertRecordsWritten = 0L;
    private boolean useWriterSchema;

    public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, Iterator<HoodieRecord<T>> recordItr, String fileId) {
        super(config, commitTime, fileId, hoodieTable);
        String partitionPath = this.init(fileId, recordItr);
        this.init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
    }

    public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, Map<String, HoodieRecord<T>> keyToNewRecords, String fileId, HoodieBaseFile dataFileToBeMerged) {
        super(config, commitTime, fileId, hoodieTable);
        this.keyToNewRecords = keyToNewRecords;
        this.useWriterSchema = true;
        this.init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()).getPartitionPath(), dataFileToBeMerged);
    }

    public static Schema createHoodieWriteSchema(Schema originalSchema) {
        return HoodieAvroUtils.addMetadataFields(originalSchema);
    }

    @Override
    public Path makeNewPath(String partitionPath) {
        Path path = FSUtils.getPartitionPath(this.config.getBasePath(), partitionPath);
        try {
            this.fs.mkdirs(path);
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to make dir " + path, e);
        }
        return new Path(path.toString(), FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId));
    }

    @Override
    public Schema getWriterSchema() {
        return this.writerSchema;
    }

    @Override
    public boolean canWrite(HoodieRecord record) {
        return false;
    }

    @Override
    public void write(HoodieRecord record, Option<IndexedRecord> insertValue) {
    }

    @Override
    public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<Exception> exception) {
        Option<Map<String, String>> recordMetadata = record.getData().getMetadata();
        if (exception.isPresent() && exception.get() instanceof Throwable) {
            this.writeStatus.markFailure(record, exception.get(), recordMetadata);
            LOG.error((Object)("Error writing record " + record), (Throwable)exception.get());
        } else {
            this.write(record, avroRecord);
        }
    }

    @Override
    protected GenericRecord rewriteRecord(GenericRecord record) {
        return HoodieAvroUtils.rewriteRecord(record, this.writerSchema);
    }

    private void init(String fileId, String partitionPath, HoodieBaseFile dataFileToBeMerged) {
        LOG.info((Object)("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId));
        this.writtenRecordKeys = new HashSet<String>();
        this.writeStatus.setStat(new HoodieWriteStat());
        try {
            String latestValidFilePath = dataFileToBeMerged.getFileName();
            this.writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
            HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(this.fs, this.instantTime, new Path(this.config.getBasePath()), FSUtils.getPartitionPath(this.config.getBasePath(), partitionPath));
            partitionMetadata.trySave(TaskContext.getPartitionId());
            this.oldFilePath = new Path(this.config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
            String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + FSUtils.makeDataFileName(this.instantTime, this.writeToken, fileId)).toString();
            this.newFilePath = new Path(this.config.getBasePath(), relativePath);
            LOG.info((Object)String.format("Merging new data into oldPath %s, as newPath %s", this.oldFilePath.toString(), this.newFilePath.toString()));
            this.writeStatus.setFileId(fileId);
            this.writeStatus.setPartitionPath(partitionPath);
            this.writeStatus.getStat().setPartitionPath(partitionPath);
            this.writeStatus.getStat().setFileId(fileId);
            this.writeStatus.getStat().setPath(new Path(this.config.getBasePath()), this.newFilePath);
            this.createMarkerFile(partitionPath);
            this.storageWriter = HoodieStorageWriterFactory.getStorageWriter(this.instantTime, this.newFilePath, this.hoodieTable, this.config, this.writerSchema);
        }
        catch (IOException io) {
            LOG.error((Object)("Error in update task at commit " + this.instantTime), (Throwable)io);
            this.writeStatus.setGlobalError(io);
            throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " + this.instantTime + " on path " + this.hoodieTable.getMetaClient().getBasePath(), io);
        }
    }

    private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
        try {
            long memoryForMerge = this.config.getMaxMemoryPerPartitionMerge();
            LOG.info((Object)("MaxMemoryPerPartitionMerge => " + memoryForMerge));
            this.keyToNewRecords = new ExternalSpillableMap<String, String>(memoryForMerge, this.config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(this.originalSchema));
        }
        catch (IOException io) {
            throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
        }
        String partitionPath = null;
        while (newRecordsItr.hasNext()) {
            HoodieRecord<T> record = newRecordsItr.next();
            partitionPath = record.getPartitionPath();
            record.unseal();
            record.setNewLocation(new HoodieRecordLocation(this.instantTime, fileId));
            record.seal();
            this.keyToNewRecords.put(record.getRecordKey(), record);
        }
        LOG.info((Object)("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap)this.keyToNewRecords).getInMemoryMapNumEntries() + "Total size in bytes of MemoryBasedMap => " + ((ExternalSpillableMap)this.keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " + ((ExternalSpillableMap)this.keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + ((ExternalSpillableMap)this.keyToNewRecords).getSizeOfFileOnDiskInBytes()));
        return partitionPath;
    }

    private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
        if (indexedRecord.isPresent()) {
            ++this.updatedRecordsWritten;
        }
        return this.writeRecord(hoodieRecord, indexedRecord);
    }

    private boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
        Option<Map<String, String>> recordMetadata = hoodieRecord.getData().getMetadata();
        try {
            if (indexedRecord.isPresent()) {
                GenericRecord recordWithMetadataInSchema = this.rewriteRecord((GenericRecord)indexedRecord.get());
                this.storageWriter.writeAvroWithMetadata((IndexedRecord)recordWithMetadataInSchema, hoodieRecord);
                ++this.recordsWritten;
            } else {
                ++this.recordsDeleted;
            }
            this.writeStatus.markSuccess(hoodieRecord, recordMetadata);
            hoodieRecord.deflate();
            return true;
        }
        catch (Exception e) {
            LOG.error((Object)("Error writing record  " + hoodieRecord), (Throwable)e);
            this.writeStatus.markFailure(hoodieRecord, e, recordMetadata);
            return false;
        }
    }

    public void write(GenericRecord oldRecord) {
        String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        boolean copyOldRecord = true;
        if (this.keyToNewRecords.containsKey(key)) {
            HoodieRecord<T> hoodieRecord = new HoodieRecord<T>(this.keyToNewRecords.get(key));
            try {
                Option<IndexedRecord> combinedAvroRecord = hoodieRecord.getData().combineAndGetUpdateValue((IndexedRecord)oldRecord, this.useWriterSchema ? this.writerSchema : this.originalSchema);
                if (this.writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
                    copyOldRecord = false;
                }
                this.writtenRecordKeys.add(key);
            }
            catch (Exception e) {
                throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {" + this.keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
            }
        }
        if (copyOldRecord) {
            String errMsg = "Failed to merge old record into new file for key " + key + " from old file " + this.getOldFilePath() + " to new file " + this.newFilePath;
            try {
                this.storageWriter.writeAvro(key, (IndexedRecord)oldRecord);
            }
            catch (ClassCastException e) {
                LOG.error((Object)("Schema mismatch when rewriting old record " + oldRecord + " from file " + this.getOldFilePath() + " to file " + this.newFilePath + " with writerSchema " + this.writerSchema.toString(true)));
                throw new HoodieUpsertException(errMsg, e);
            }
            catch (IOException e) {
                LOG.error((Object)("Failed to merge old record into new file for key " + key + " from old file " + this.getOldFilePath() + " to new file " + this.newFilePath), (Throwable)e);
                throw new HoodieUpsertException(errMsg, e);
            }
            ++this.recordsWritten;
        }
    }

    @Override
    public WriteStatus close() {
        try {
            Iterator<Object> newRecordsItr;
            Iterator<Object> iterator2 = newRecordsItr = this.keyToNewRecords instanceof ExternalSpillableMap ? ((ExternalSpillableMap)this.keyToNewRecords).iterator() : this.keyToNewRecords.values().iterator();
            while (newRecordsItr.hasNext()) {
                HoodieRecord hoodieRecord = (HoodieRecord)newRecordsItr.next();
                if (this.writtenRecordKeys.contains(hoodieRecord.getRecordKey())) continue;
                if (this.useWriterSchema) {
                    this.writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(this.writerSchema));
                } else {
                    this.writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(this.originalSchema));
                }
                ++this.insertRecordsWritten;
            }
            this.keyToNewRecords.clear();
            this.writtenRecordKeys.clear();
            if (this.storageWriter != null) {
                this.storageWriter.close();
            }
            long fileSizeInBytes = FSUtils.getFileSize(this.fs, this.newFilePath);
            HoodieWriteStat stat = this.writeStatus.getStat();
            stat.setTotalWriteBytes(fileSizeInBytes);
            stat.setFileSizeInBytes(fileSizeInBytes);
            stat.setNumWrites(this.recordsWritten);
            stat.setNumDeletes(this.recordsDeleted);
            stat.setNumUpdateWrites(this.updatedRecordsWritten);
            stat.setNumInserts(this.insertRecordsWritten);
            stat.setTotalWriteErrors(this.writeStatus.getTotalErrorRecords());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalUpsertTime(this.timer.endTimer());
            stat.setRuntimeStats(runtimeStats);
            LOG.info((Object)String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime()));
            return this.writeStatus;
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to close UpdateHandle", e);
        }
    }

    public Path getOldFilePath() {
        return this.oldFilePath;
    }

    @Override
    public WriteStatus getWriteStatus() {
        return this.writeStatus;
    }
}

