/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.client.SecondaryIndexStats;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.read.HoodieReadStats;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.HoodieCDCLogger;
import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.SecondaryIndexStreamingTracker;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class FileGroupReaderBasedMergeHandle<T, I, K, O>
extends HoodieWriteMergeHandle<T, I, K, O> {
    private static final Logger LOG = LoggerFactory.getLogger(FileGroupReaderBasedMergeHandle.class);
    private final Option<CompactionOperation> compactionOperation;
    private final String maxInstantTime;
    private HoodieReadStats readStats;
    private HoodieRecord.HoodieRecordType recordType;
    private Option<HoodieCDCLogger> cdcLogger;
    private final TypedProperties props;
    private final Iterator<HoodieRecord<T>> incomingRecordsItr;

    public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
        this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, FileGroupReaderBasedMergeHandle.getLatestBaseFile(hoodieTable, partitionPath, fileId), keyGeneratorOpt);
    }

    public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
        super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, baseFile, keyGeneratorOpt);
        this.compactionOperation = Option.empty();
        TypedProperties properties = config.getProps();
        properties.putAll((Map)hoodieTable.getMetaClient().getTableConfig().getProps());
        this.maxInstantTime = instantTime;
        this.initRecordTypeAndCdcLogger(hoodieTable.getConfig().getRecordMerger().getRecordType());
        this.props = TypedProperties.copy((Properties)config.getProps());
        this.incomingRecordsItr = recordItr;
    }

    public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, CompactionOperation compactionOperation, TaskContextSupplier taskContextSupplier, HoodieReaderContext<T> readerContext, String maxInstantTime, HoodieRecord.HoodieRecordType enginRecordType) {
        super(config, instantTime, compactionOperation.getPartitionPath(), compactionOperation.getFileId(), hoodieTable, taskContextSupplier);
        this.maxInstantTime = maxInstantTime;
        this.keyToNewRecords = Collections.emptyMap();
        this.readerContext = readerContext;
        this.compactionOperation = Option.of((Object)compactionOperation);
        this.initRecordTypeAndCdcLogger(enginRecordType);
        this.init(compactionOperation, this.partitionPath);
        this.props = TypedProperties.copy((Properties)config.getProps());
        this.incomingRecordsItr = null;
    }

    private void initRecordTypeAndCdcLogger(HoodieRecord.HoodieRecordType enginRecordType) {
        this.recordType = this.hoodieTable.isMetadataTable() || HoodieFileFormat.HFILE.getFileExtension().equals(this.hoodieTable.getBaseFileExtension()) ? HoodieRecord.HoodieRecordType.AVRO : enginRecordType;
        this.cdcLogger = this.hoodieTable.getMetaClient().getTableConfig().isCDCEnabled() ? Option.of((Object)new HoodieCDCLogger(this.instantTime, this.config, this.hoodieTable.getMetaClient().getTableConfig(), this.partitionPath, this.storage, this.getWriterSchema(), this.createLogWriter(this.instantTime, ".cdc", (Option<FileSlice>)Option.empty()), IOUtils.getMaxMemoryPerPartitionMerge(this.taskContextSupplier, this.config))) : Option.empty();
    }

    private void init(CompactionOperation operation, String partitionPath) {
        LOG.info("partitionPath:{}, fileId to be merged:{}", (Object)partitionPath, (Object)this.fileId);
        this.baseFileToMerge = (HoodieBaseFile)operation.getBaseFile(this.config.getBasePath(), operation.getPartitionPath()).orElse(null);
        this.writtenRecordKeys = new HashSet();
        this.writeStatus.setStat(new HoodieWriteStat());
        this.writeStatus.getStat().setTotalLogSizeCompacted(((Double)operation.getMetrics().get("TOTAL_LOG_FILES_SIZE")).longValue());
        try {
            Option latestValidFilePath = Option.empty();
            if (this.baseFileToMerge != null) {
                latestValidFilePath = Option.of((Object)this.baseFileToMerge.getFileName());
                this.writeStatus.getStat().setPrevCommit(this.baseFileToMerge.getCommitTime());
                this.writeStatus.getStat().setPrevBaseFile((String)latestValidFilePath.get());
            } else {
                this.writeStatus.getStat().setPrevCommit("null");
            }
            HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(this.storage, this.instantTime, new StoragePath(this.config.getBasePath()), FSUtils.constructAbsolutePath((String)this.config.getBasePath(), (String)partitionPath), this.hoodieTable.getPartitionMetafileFormat());
            partitionMetadata.trySave();
            String oldFileName = latestValidFilePath.isPresent() ? (String)latestValidFilePath.get() : null;
            String newFileName = this.createNewFileName(oldFileName);
            this.oldFilePath = this.makeNewFilePath(partitionPath, oldFileName);
            this.newFilePath = this.makeNewFilePath(partitionPath, newFileName);
            LOG.info("Merging data from file group {}, to a new base file {}", (Object)this.fileId, (Object)this.newFilePath);
            this.writeStatus.setFileId(this.fileId);
            this.writeStatus.setPartitionPath(partitionPath);
            this.writeStatus.getStat().setPartitionPath(partitionPath);
            this.writeStatus.getStat().setFileId(this.fileId);
            this.setWriteStatusPath();
            this.createMarkerFile(partitionPath, this.newFilePath.getName());
            this.fileWriter = HoodieFileWriterFactory.getFileWriter((String)this.instantTime, (StoragePath)this.newFilePath, (HoodieStorage)this.hoodieTable.getStorage(), (HoodieConfig)this.config, (Schema)this.writeSchemaWithMetaFields, (TaskContextSupplier)this.taskContextSupplier, (HoodieRecord.HoodieRecordType)this.recordType);
        }
        catch (IOException io) {
            this.writeStatus.setGlobalError(io);
            throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + this.fileId + " on commit " + this.instantTime + " on path " + this.hoodieTable.getMetaClient().getBasePath(), io);
        }
    }

    @Override
    protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>> newRecordsItr) {
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void doMerge() {
        if (this.compactionOperation.isEmpty()) {
            this.readerContext.initRecordMergerForIngestion(this.config.getProps());
        }
        boolean usePosition = this.config.getBooleanOrDefault(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS);
        Option internalSchemaOption = SerDeHelper.fromJson((String)this.config.getInternalSchema()).map(internalSchema -> AvroSchemaEvolutionUtils.reconcileSchema((Schema)this.writeSchemaWithMetaFields, (InternalSchema)internalSchema, (boolean)this.config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
        long maxMemoryPerCompaction = this.getMaxMemoryForMerge();
        this.props.put((Object)HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), (Object)String.valueOf(maxMemoryPerCompaction));
        Option logFilesStreamOpt = this.compactionOperation.map(op -> op.getDeltaFileNames().stream().map(logFileName -> new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath((String)this.config.getBasePath(), (String)op.getPartitionPath()), logFileName))));
        try (HoodieFileGroupReader<T> fileGroupReader = this.getFileGroupReader(usePosition, (Option<InternalSchema>)internalSchemaOption, this.props, (Option<Stream<HoodieLogFile>>)logFilesStreamOpt, this.incomingRecordsItr);
             ClosableIterator recordIterator = fileGroupReader.getClosableHoodieRecordIterator();){
            while (recordIterator.hasNext()) {
                HoodieRecord record = (HoodieRecord)recordIterator.next();
                Option<Map<String, String>> recordMetadata = this.compactionOperation.isEmpty() ? this.getRecordMetadata(record, this.writeSchema, (Properties)this.props) : Option.empty();
                record.setCurrentLocation(this.newRecordLocation);
                record.setNewLocation(this.newRecordLocation);
                if (!this.partitionPath.equals(record.getPartitionPath())) {
                    HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " + record.getPartitionPath() + " but trying to insert into partition: " + this.partitionPath);
                    this.writeStatus.markFailure(record, (Throwable)((Object)failureEx), recordMetadata);
                    continue;
                }
                try {
                    boolean shouldPreserveRecordMetadata = this.preserveMetadata || record.getOperation() == null;
                    Schema recordSchema = shouldPreserveRecordMetadata ? this.writeSchemaWithMetaFields : this.writeSchema;
                    this.writeToFile(record.getKey(), record, recordSchema, (Properties)this.config.getPayloadConfig().getProps(), shouldPreserveRecordMetadata);
                    this.writeStatus.markSuccess(record, recordMetadata);
                    ++this.recordsWritten;
                }
                catch (Exception e) {
                    LOG.error("Error writing record {}", (Object)record, (Object)e);
                    this.writeStatus.markFailure(record, e, recordMetadata);
                    fileGroupReader.onWriteFailure(record.getRecordKey());
                }
            }
            this.readStats = fileGroupReader.getStats();
            this.insertRecordsWritten = this.readStats.getNumInserts();
            this.updatedRecordsWritten = this.readStats.getNumUpdates();
            this.recordsDeleted = this.readStats.getNumDeletes();
            return;
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to compact file group: " + this.fileId, e);
        }
    }

    protected long getMaxMemoryForMerge() {
        return this.compactionOperation.isPresent() ? IOUtils.getMaxMemoryPerCompaction(this.taskContextSupplier, this.config) : IOUtils.getMaxMemoryPerPartitionMerge(this.taskContextSupplier, this.config);
    }

    private HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition, Option<InternalSchema> internalSchemaOption, TypedProperties props, Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>> incomingRecordsItr) {
        HoodieFileGroupReader.Builder fileGroupBuilder = HoodieFileGroupReader.newBuilder().withReaderContext(this.readerContext).withHoodieTableMetaClient(this.hoodieTable.getMetaClient()).withLatestCommitTime(this.maxInstantTime).withPartitionPath(this.partitionPath).withBaseFileOption(Option.ofNullable((Object)this.baseFileToMerge)).withDataSchema(this.writeSchemaWithMetaFields).withRequestedSchema(this.writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props).withShouldUseRecordPosition(usePosition).withSortOutput(this.hoodieTable.requireSortedRecords()).withFileGroupUpdateCallback(this.createCallback());
        if (logFileStreamOpt.isPresent()) {
            fileGroupBuilder.withLogFiles((Stream)logFileStreamOpt.get());
        } else {
            fileGroupBuilder.withRecordIterator(incomingRecordsItr);
        }
        return fileGroupBuilder.build();
    }

    @Override
    protected void writeIncomingRecords() {
    }

    @Override
    public List<WriteStatus> close() {
        try {
            super.close();
            this.cdcLogger.ifPresent(logger -> {
                logger.close();
                this.writeStatus.getStat().setCdcStats(logger.getCDCWriteStats());
            });
            this.writeStatus.getStat().setTotalLogReadTimeMs(this.readStats.getTotalLogReadTimeMs());
            this.writeStatus.getStat().setTotalUpdatedRecordsCompacted(this.readStats.getTotalUpdatedRecordsCompacted());
            this.writeStatus.getStat().setTotalLogFilesCompacted(this.readStats.getTotalLogFilesCompacted());
            this.writeStatus.getStat().setTotalLogRecords(this.readStats.getTotalLogRecords());
            this.writeStatus.getStat().setTotalLogBlocks(this.readStats.getTotalLogBlocks());
            this.writeStatus.getStat().setTotalCorruptLogBlock(this.readStats.getTotalCorruptLogBlock());
            this.writeStatus.getStat().setTotalRollbackBlocks(this.readStats.getTotalRollbackBlocks());
            if (this.compactionOperation.isPresent()) {
                this.writeStatus.getStat().setTotalLogSizeCompacted(((Double)((CompactionOperation)this.compactionOperation.get()).getMetrics().get("TOTAL_LOG_FILES_SIZE")).longValue());
            }
            if (this.writeStatus.getStat().getRuntimeStats() != null) {
                this.writeStatus.getStat().getRuntimeStats().setTotalScanTime(this.readStats.getTotalLogReadTimeMs());
            }
            return Collections.singletonList(this.writeStatus);
        }
        catch (Exception e) {
            throw new HoodieUpsertException("Failed to close " + this.getClass().getSimpleName(), e);
        }
    }

    private Option<BaseFileUpdateCallback<T>> createCallback() {
        ArrayList callbacks = new ArrayList();
        if (this.cdcLogger.isPresent()) {
            callbacks.add(new CDCCallback((HoodieCDCLogger)this.cdcLogger.get(), this.readerContext));
        }
        if (this.compactionOperation.isEmpty()) {
            if (this.writeStatus.isTrackingSuccessfulWrites()) {
                this.writeStatus.manuallyTrackSuccess();
                RecordLevelIndexCallback recordLevelIndexCallback = new RecordLevelIndexCallback(this.writeStatus, this.newRecordLocation, this.partitionPath);
                callbacks.add(recordLevelIndexCallback);
            }
            if (this.isSecondaryIndexStatsStreamingWritesEnabled) {
                SecondaryIndexCallback secondaryIndexCallback = new SecondaryIndexCallback(this.partitionPath, this.readerContext, this.writeStatus, this.secondaryIndexDefns);
                callbacks.add(secondaryIndexCallback);
            }
        }
        return callbacks.isEmpty() ? Option.empty() : Option.of(CompositeCallback.of(callbacks));
    }

    private static class CompositeCallback<T>
    implements BaseFileUpdateCallback<T> {
        private final List<BaseFileUpdateCallback<T>> callbacks;

        static <T> BaseFileUpdateCallback<T> of(List<BaseFileUpdateCallback<T>> callbacks) {
            if (callbacks.size() == 1) {
                return callbacks.get(0);
            }
            return new CompositeCallback<T>(callbacks);
        }

        private CompositeCallback(List<BaseFileUpdateCallback<T>> callbacks) {
            this.callbacks = callbacks;
        }

        public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, BufferedRecord<T> mergedRecord) {
            this.callbacks.forEach(callback -> callback.onUpdate(recordKey, previousRecord, mergedRecord));
        }

        public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
            this.callbacks.forEach(callback -> callback.onInsert(recordKey, newRecord));
        }

        public void onDelete(String recordKey, BufferedRecord<T> previousRecord, HoodieOperation hoodieOperation) {
            this.callbacks.forEach(callback -> callback.onDelete(recordKey, previousRecord, hoodieOperation));
        }

        public void onFailure(String recordKey) {
            this.callbacks.forEach(callback -> callback.onFailure(recordKey));
        }
    }

    private static class SecondaryIndexCallback<T>
    implements BaseFileUpdateCallback<T> {
        private final String partitionPath;
        private final HoodieReaderContext<T> readerContext;
        private final WriteStatus writeStatus;
        private final List<HoodieIndexDefinition> secondaryIndexDefns;

        public SecondaryIndexCallback(String partitionPath, HoodieReaderContext<T> readerContext, WriteStatus writeStatus, List<HoodieIndexDefinition> secondaryIndexDefns) {
            this.partitionPath = partitionPath;
            this.readerContext = readerContext;
            this.secondaryIndexDefns = secondaryIndexDefns;
            this.writeStatus = writeStatus;
        }

        public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, BufferedRecord<T> mergedRecord) {
            HoodieKey hoodieKey = new HoodieKey(recordKey, this.partitionPath);
            SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey, Option.of(mergedRecord), previousRecord, false, this.writeStatus, this.secondaryIndexDefns, this.readerContext.getRecordContext());
        }

        public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
            HoodieKey hoodieKey = new HoodieKey(recordKey, this.partitionPath);
            SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey, Option.of(newRecord), null, false, this.writeStatus, this.secondaryIndexDefns, this.readerContext.getRecordContext());
        }

        public void onDelete(String recordKey, BufferedRecord<T> previousRecord, HoodieOperation hoodieOperation) {
            HoodieKey hoodieKey = new HoodieKey(recordKey, this.partitionPath);
            SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey, Option.empty(), previousRecord, true, this.writeStatus, this.secondaryIndexDefns, this.readerContext.getRecordContext());
        }

        public void onFailure(String recordKey) {
            this.writeStatus.getIndexStats().getSecondaryIndexStats().forEach((partition, indexStats) -> {
                ArrayList<Integer> indicesToRemove = new ArrayList<Integer>();
                for (int i = indexStats.size() - 1; i >= 0 && ((SecondaryIndexStats)indexStats.get(i)).getRecordKey().equals(recordKey); --i) {
                    indicesToRemove.add(i);
                }
                indicesToRemove.forEach(index -> {
                    SecondaryIndexStats cfr_ignored_0 = (SecondaryIndexStats)indexStats.remove((int)index);
                });
            });
        }
    }

    private static class RecordLevelIndexCallback<T>
    implements BaseFileUpdateCallback<T> {
        private final WriteStatus writeStatus;
        private final HoodieRecordLocation fileRecordLocation;
        private final String partitionPath;

        public RecordLevelIndexCallback(WriteStatus writeStatus, HoodieRecordLocation fileRecordLocation, String partitionPath) {
            this.writeStatus = writeStatus;
            this.fileRecordLocation = fileRecordLocation;
            this.partitionPath = partitionPath;
        }

        public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, BufferedRecord<T> mergedRecord) {
            this.writeStatus.addRecordDelegate(HoodieRecordDelegate.create((String)recordKey, (String)this.partitionPath, (HoodieRecordLocation)this.fileRecordLocation, (HoodieRecordLocation)this.fileRecordLocation));
        }

        public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
            this.writeStatus.addRecordDelegate(HoodieRecordDelegate.create((String)recordKey, (String)this.partitionPath, null, (HoodieRecordLocation)this.fileRecordLocation));
        }

        public void onDelete(String recordKey, BufferedRecord<T> previousRecord, HoodieOperation hoodieOperation) {
            if (hoodieOperation != HoodieOperation.UPDATE_BEFORE) {
                this.writeStatus.addRecordDelegate(HoodieRecordDelegate.create((String)recordKey, (String)this.partitionPath, (HoodieRecordLocation)this.fileRecordLocation, null));
            }
        }

        public void onFailure(String recordKey) {
            int lastIndex = this.writeStatus.getWrittenRecordDelegates().size() - 1;
            if (lastIndex >= 0 && this.writeStatus.getWrittenRecordDelegates().get(lastIndex).getRecordKey().equals(recordKey)) {
                this.writeStatus.getWrittenRecordDelegates().remove(lastIndex);
            }
        }
    }

    private static class CDCCallback<T>
    implements BaseFileUpdateCallback<T> {
        private final HoodieCDCLogger cdcLogger;
        private final RecordContext<T> recordContext;

        CDCCallback(HoodieCDCLogger cdcLogger, HoodieReaderContext<T> readerContext) {
            this.cdcLogger = cdcLogger;
            this.recordContext = readerContext.getRecordContext();
        }

        public void onUpdate(String recordKey, BufferedRecord<T> previousRecord, BufferedRecord<T> mergedRecord) {
            this.cdcLogger.put(recordKey, this.convertOutput(previousRecord), (Option<IndexedRecord>)Option.of((Object)this.convertOutput(mergedRecord)));
        }

        public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
            this.cdcLogger.put(recordKey, null, (Option<IndexedRecord>)Option.of((Object)this.convertOutput(newRecord)));
        }

        public void onDelete(String recordKey, BufferedRecord<T> previousRecord, HoodieOperation hoodieOperation) {
            if (previousRecord == null) {
                return;
            }
            this.cdcLogger.put(recordKey, this.convertOutput(previousRecord), (Option<IndexedRecord>)Option.empty());
        }

        public void onFailure(String recordKey) {
            this.cdcLogger.remove(recordKey);
        }

        private GenericRecord convertOutput(BufferedRecord<T> record) {
            if (record == null || record.getRecord() == null) {
                return null;
            }
            return this.recordContext.convertToAvroRecord(record.getRecord(), this.recordContext.getSchemaFromBufferRecord(record));
        }
    }
}

