/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.TableFileSystemView;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.util.SizeEstimator;

public class HoodieAppendHandle<T extends HoodieRecordPayload>
extends HoodieWriteHandle<T> {
    private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
    private static AtomicLong recordIndex = new AtomicLong(1L);
    private final String fileId;
    private List<IndexedRecord> recordList = new ArrayList<IndexedRecord>();
    private List<HoodieKey> keysToDelete = new ArrayList<HoodieKey>();
    private String partitionPath;
    private Iterator<HoodieRecord<T>> recordItr;
    private long recordsWritten = 0L;
    private long recordsDeleted = 0L;
    private long updatedRecordsWritten = 0L;
    private long averageRecordSize = 0L;
    private HoodieLogFile currentLogFile;
    private HoodieLogFormat.Writer writer;
    private boolean doInit = true;
    private long estimatedNumberOfBytesWritten;
    private long sizeInBytes = 0L;
    private int numberOfRecords = 0;
    private int maxBlockSize = this.config.getLogFileDataBlockMaxSize();
    private Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
    private long insertRecordsWritten = 0L;

    public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, String fileId, Iterator<HoodieRecord<T>> recordItr) {
        super(config, commitTime, fileId, hoodieTable);
        this.writeStatus.setStat(new HoodieDeltaWriteStat());
        this.fileId = fileId;
        this.recordItr = recordItr;
    }

    public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, String fileId) {
        this(config, commitTime, hoodieTable, fileId, null);
    }

    private void init(HoodieRecord record) {
        if (this.doInit) {
            this.partitionPath = record.getPartitionPath();
            TableFileSystemView.SliceView rtView = this.hoodieTable.getSliceView();
            Option<FileSlice> fileSlice = rtView.getLatestFileSlice(this.partitionPath, this.fileId);
            String baseInstantTime = this.instantTime;
            if (fileSlice.isPresent()) {
                baseInstantTime = fileSlice.get().getBaseInstantTime();
            } else {
                fileSlice = Option.of(new FileSlice(this.partitionPath, baseInstantTime, this.fileId));
                LOG.info((Object)("New InsertHandle for partition :" + this.partitionPath));
            }
            this.writeStatus.getStat().setPrevCommit(baseInstantTime);
            this.writeStatus.setFileId(this.fileId);
            this.writeStatus.setPartitionPath(this.partitionPath);
            this.writeStatus.getStat().setPartitionPath(this.partitionPath);
            this.writeStatus.getStat().setFileId(this.fileId);
            this.averageRecordSize = SizeEstimator.estimate((Object)record);
            try {
                HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(this.fs, baseInstantTime, new Path(this.config.getBasePath()), FSUtils.getPartitionPath(this.config.getBasePath(), this.partitionPath));
                partitionMetadata.trySave(TaskContext.getPartitionId());
                this.writer = this.createLogWriter(fileSlice, baseInstantTime);
                this.currentLogFile = this.writer.getLogFile();
                ((HoodieDeltaWriteStat)this.writeStatus.getStat()).setLogVersion(this.currentLogFile.getLogVersion());
                ((HoodieDeltaWriteStat)this.writeStatus.getStat()).setLogOffset(this.writer.getCurrentSize());
            }
            catch (Exception e) {
                LOG.error((Object)("Error in update task at commit " + this.instantTime), (Throwable)e);
                this.writeStatus.setGlobalError(e);
                throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + this.fileId + " on commit " + this.instantTime + " on HDFS path " + this.hoodieTable.getMetaClient().getBasePath() + this.partitionPath, e);
            }
            Path path = this.partitionPath.length() == 0 ? new Path(this.writer.getLogFile().getFileName()) : new Path(this.partitionPath, this.writer.getLogFile().getFileName());
            this.writeStatus.getStat().setPath(path.toString());
            this.doInit = false;
        }
    }

    private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
        Option<Map<String, String>> recordMetadata = hoodieRecord.getData().getMetadata();
        try {
            Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(this.originalSchema);
            if (avroRecord.isPresent()) {
                avroRecord = Option.of(this.rewriteRecord((GenericRecord)avroRecord.get()));
                String seqId = HoodieRecord.generateSequenceId(this.instantTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement());
                HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord)avroRecord.get(), hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(), this.fileId);
                HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord)avroRecord.get(), this.instantTime, seqId);
                if (hoodieRecord.getCurrentLocation() != null) {
                    ++this.updatedRecordsWritten;
                } else {
                    ++this.insertRecordsWritten;
                }
                ++this.recordsWritten;
            } else {
                ++this.recordsDeleted;
            }
            this.writeStatus.markSuccess(hoodieRecord, recordMetadata);
            hoodieRecord.deflate();
            return avroRecord;
        }
        catch (Exception e) {
            LOG.error((Object)("Error writing record  " + hoodieRecord), (Throwable)e);
            this.writeStatus.markFailure(hoodieRecord, e, recordMetadata);
            return Option.empty();
        }
    }

    public void doAppend() {
        while (this.recordItr.hasNext()) {
            HoodieRecord<T> record = this.recordItr.next();
            this.init(record);
            this.flushToDiskIfRequired(record);
            this.writeToBuffer(record);
        }
        this.doAppend(this.header);
        this.estimatedNumberOfBytesWritten += this.averageRecordSize * (long)this.numberOfRecords;
    }

    private void doAppend(Map<HoodieLogBlock.HeaderMetadataType, String> header) {
        try {
            header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, this.instantTime);
            header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, this.writerSchema.toString());
            if (this.recordList.size() > 0) {
                this.writer = this.writer.appendBlock(new HoodieAvroDataBlock(this.recordList, header));
                this.recordList.clear();
            }
            if (this.keysToDelete.size() > 0) {
                this.writer = this.writer.appendBlock(new HoodieDeleteBlock(this.keysToDelete.toArray(new HoodieKey[this.keysToDelete.size()]), header));
                this.keysToDelete.clear();
            }
        }
        catch (Exception e) {
            throw new HoodieAppendException("Failed while appending records to " + this.currentLogFile.getPath(), e);
        }
    }

    @Override
    public boolean canWrite(HoodieRecord record) {
        return (double)this.config.getParquetMaxFileSize() >= (double)this.estimatedNumberOfBytesWritten * this.config.getLogFileToParquetCompressionRatio();
    }

    @Override
    public void write(HoodieRecord record, Option<IndexedRecord> insertValue) {
        Option<Map<String, String>> recordMetadata = record.getData().getMetadata();
        try {
            this.init(record);
            this.flushToDiskIfRequired(record);
            this.writeToBuffer(record);
        }
        catch (Throwable t) {
            this.writeStatus.markFailure(record, t, recordMetadata);
            LOG.error((Object)("Error writing record " + record), t);
        }
    }

    @Override
    public WriteStatus close() {
        try {
            this.doAppend(this.header);
            if (this.writer != null) {
                this.sizeInBytes = this.writer.getCurrentSize();
                this.writer.close();
            }
            HoodieWriteStat stat = this.writeStatus.getStat();
            stat.setFileId(this.fileId);
            stat.setNumWrites(this.recordsWritten);
            stat.setNumUpdateWrites(this.updatedRecordsWritten);
            stat.setNumInserts(this.insertRecordsWritten);
            stat.setNumDeletes(this.recordsDeleted);
            stat.setTotalWriteBytes(this.estimatedNumberOfBytesWritten);
            stat.setFileSizeInBytes(this.sizeInBytes);
            stat.setTotalWriteErrors(this.writeStatus.getTotalErrorRecords());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalUpsertTime(this.timer.endTimer());
            stat.setRuntimeStats(runtimeStats);
            LOG.info((Object)String.format("AppendHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime()));
            return this.writeStatus;
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to close UpdateHandle", e);
        }
    }

    @Override
    public WriteStatus getWriteStatus() {
        return this.writeStatus;
    }

    private HoodieLogFormat.Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime) throws IOException, InterruptedException {
        Option<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();
        return HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.getPartitionPath(this.hoodieTable.getMetaClient().getBasePath(), this.partitionPath)).withFileId(this.fileId).overBaseCommit(baseCommitTime).withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)).withSizeThreshold(this.config.getLogFileMaxSize()).withFs(this.fs).withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(this.writeToken)).withRolloverLogWriteToken(this.writeToken).withFileExtension(".log").build();
    }

    private void writeToBuffer(HoodieRecord<T> record) {
        record.unseal();
        record.setNewLocation(new HoodieRecordLocation(this.instantTime, this.fileId));
        record.seal();
        Option<IndexedRecord> indexedRecord = this.getIndexedRecord(record);
        if (indexedRecord.isPresent()) {
            this.recordList.add(indexedRecord.get());
        } else {
            this.keysToDelete.add(record.getKey());
        }
        ++this.numberOfRecords;
    }

    private void flushToDiskIfRequired(HoodieRecord record) {
        if (this.numberOfRecords >= (int)((long)this.maxBlockSize / this.averageRecordSize)) {
            LOG.info((Object)("AvgRecordSize => " + this.averageRecordSize));
            this.averageRecordSize = (this.averageRecordSize + SizeEstimator.estimate((Object)record)) / 2L;
            this.doAppend(this.header);
            this.estimatedNumberOfBytesWritten += this.averageRecordSize * (long)this.numberOfRecords;
            this.numberOfRecords = 0;
        }
    }
}

