/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.format.cdc;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CdcInputFormat
extends MergeOnReadInputFormat {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(CdcInputFormat.class);

    private CdcInputFormat(Configuration conf, MergeOnReadTableState tableState, List<DataType> fieldTypes, String defaultPartName, List<ExpressionPredicates.Predicate> predicates, long limit, boolean emitDelete) {
        super(conf, tableState, fieldTypes, defaultPartName, predicates, limit, emitDelete, InternalSchemaManager.DISABLED);
    }

    @Override
    protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit split) throws IOException {
        if (split instanceof CdcInputSplit) {
            HoodieCDCSupplementalLoggingMode mode = OptionsResolver.getCDCSupplementalLoggingMode(this.conf);
            ImageManager manager = new ImageManager(this.conf, this.tableState.getRowType(), this::getFileSliceIterator);
            Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc = cdcFileSplit -> this.getRecordIteratorV2(split.getTablePath(), split.getMaxCompactionMemoryInBytes(), (HoodieCDCFileSplit)cdcFileSplit, mode, manager);
            return new CdcFileSplitsIterator((CdcInputSplit)split, manager, recordIteratorFunc);
        }
        return super.initIterator(split);
    }

    public static Builder builder() {
        return new Builder();
    }

    private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit split) {
        if (!split.getLogPaths().isPresent() || split.getLogPaths().get().size() <= 0) {
            return this.getBaseFileIteratorWithMetadata(split.getBasePath().get());
        }
        if (!split.getBasePath().isPresent()) {
            return new MergeOnReadInputFormat.LogFileOnlyIterator(this.getFullLogFileIterator(split));
        }
        Schema tableSchema = new Schema.Parser().parse(this.tableState.getAvroSchema());
        return new MergeOnReadInputFormat.MergeIterator(this.conf, this.hadoopConf, split, this.tableState.getRowType(), this.tableState.getRowType(), tableSchema, InternalSchema.getEmptyInternalSchema(), Option.empty(), Option.empty(), false, this.tableState.getOperationPos(), this.getBaseFileIteratorWithMetadata(split.getBasePath().get()));
    }

    private ClosableIterator<RowData> getRecordIteratorV2(String tablePath, long maxCompactionMemoryInBytes, HoodieCDCFileSplit fileSplit, HoodieCDCSupplementalLoggingMode mode, ImageManager imageManager) {
        try {
            return this.getRecordIterator(tablePath, maxCompactionMemoryInBytes, fileSplit, mode, imageManager);
        }
        catch (IOException e) {
            throw new HoodieException("Get record iterator error", e);
        }
    }

    private ClosableIterator<RowData> getRecordIterator(String tablePath, long maxCompactionMemoryInBytes, HoodieCDCFileSplit fileSplit, HoodieCDCSupplementalLoggingMode mode, ImageManager imageManager) throws IOException {
        switch (fileSplit.getCdcInferCase()) {
            case BASE_FILE_INSERT: {
                ValidationUtils.checkState(fileSplit.getCdcFiles() != null && fileSplit.getCdcFiles().size() == 1, "CDC file path should exist and be singleton");
                String path = new Path(tablePath, fileSplit.getCdcFiles().get(0)).toString();
                return new AddBaseFileIterator(this.getBaseFileIterator(path));
            }
            case BASE_FILE_DELETE: {
                ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(), "Before file slice should exist");
                FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
                MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(tablePath, fileSlice, maxCompactionMemoryInBytes);
                return new RemoveBaseFileIterator(this.tableState, this.getFileSliceIterator(inputSplit));
            }
            case AS_IS: {
                Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new Schema.Parser().parse(this.tableState.getAvroSchema()));
                Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
                switch (mode) {
                    case DATA_BEFORE_AFTER: {
                        return new BeforeAfterImageIterator(tablePath, this.tableState, this.hadoopConf, cdcSchema, fileSplit);
                    }
                    case DATA_BEFORE: {
                        return new BeforeImageIterator(this.conf, this.hadoopConf, tablePath, this.tableState, cdcSchema, fileSplit, imageManager);
                    }
                    case OP_KEY_ONLY: {
                        return new RecordKeyImageIterator(this.conf, this.hadoopConf, tablePath, this.tableState, cdcSchema, fileSplit, imageManager);
                    }
                }
                throw new AssertionError((Object)("Unexpected mode" + (Object)((Object)mode)));
            }
            case LOG_FILE: {
                ValidationUtils.checkState(fileSplit.getCdcFiles() != null && fileSplit.getCdcFiles().size() == 1, "CDC file path should exist and be singleton");
                String logFilepath = new Path(tablePath, fileSplit.getCdcFiles().get(0)).toString();
                return new DataLogFileIterator(this.conf, this.hadoopConf, this.internalSchemaManager, maxCompactionMemoryInBytes, imageManager, fileSplit, CdcInputFormat.singleLogFile2Split(tablePath, logFilepath, maxCompactionMemoryInBytes), this.tableState);
            }
            case REPLACE_COMMIT: {
                return new ReplaceCommitIterator(this.conf, tablePath, this.tableState, fileSplit, this::getFileSliceIterator);
            }
        }
        throw new AssertionError((Object)("Unexpected cdc file split infer case: " + (Object)((Object)fileSplit.getCdcInferCase())));
    }

    public static MergeOnReadInputSplit fileSlice2Split(String tablePath, FileSlice fileSlice, long maxCompactionMemoryInBytes) {
        Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).filter(path -> !path.endsWith(".cdc")).collect(Collectors.toList()));
        String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
        return new MergeOnReadInputSplit(0, basePath, logPaths, fileSlice.getLatestInstantTime(), tablePath, maxCompactionMemoryInBytes, "payload_combine", null, fileSlice.getFileId());
    }

    public static MergeOnReadInputSplit singleLogFile2Split(String tablePath, String filePath, long maxCompactionMemoryInBytes) {
        return new MergeOnReadInputSplit(0, null, Option.of(Collections.singletonList(filePath)), FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), tablePath, maxCompactionMemoryInBytes, "payload_combine", null, FSUtils.getFileIdFromLogPath(new StoragePath(filePath)));
    }

    public static class Builder
    extends MergeOnReadInputFormat.Builder {
        @Override
        public Builder config(Configuration conf) {
            this.conf = conf;
            return this;
        }

        @Override
        public Builder tableState(MergeOnReadTableState tableState) {
            this.tableState = tableState;
            return this;
        }

        @Override
        public Builder fieldTypes(List<DataType> fieldTypes) {
            this.fieldTypes = fieldTypes;
            return this;
        }

        @Override
        public Builder defaultPartName(String defaultPartName) {
            this.defaultPartName = defaultPartName;
            return this;
        }

        @Override
        public Builder predicates(List<ExpressionPredicates.Predicate> predicates) {
            this.predicates = predicates;
            return this;
        }

        @Override
        public Builder limit(long limit) {
            this.limit = limit;
            return this;
        }

        @Override
        public Builder emitDelete(boolean emitDelete) {
            this.emitDelete = emitDelete;
            return this;
        }

        @Override
        public CdcInputFormat build() {
            return new CdcInputFormat(this.conf, this.tableState, this.fieldTypes, this.defaultPartName, this.predicates, this.limit, this.emitDelete);
        }
    }

    private static class ImageManager
    implements AutoCloseable {
        private final HoodieWriteConfig writeConfig;
        private final RowDataSerializer serializer;
        private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>> splitIteratorFunc;
        private final Map<String, ExternalSpillableMap<String, byte[]>> cache;

        public ImageManager(Configuration flinkConf, RowType rowType, Function<MergeOnReadInputSplit, ClosableIterator<RowData>> splitIteratorFunc) {
            this.serializer = new RowDataSerializer(rowType);
            this.splitIteratorFunc = splitIteratorFunc;
            this.cache = new TreeMap<String, ExternalSpillableMap<String, byte[]>>();
            this.writeConfig = FlinkWriteClients.getHoodieClientConfig(flinkConf);
        }

        public ExternalSpillableMap<String, byte[]> getOrLoadImages(long maxCompactionMemoryInBytes, FileSlice fileSlice) throws IOException {
            String instant = fileSlice.getBaseInstantTime();
            if (this.cache.containsKey(instant)) {
                return this.cache.get(instant);
            }
            if (this.cache.size() > 1) {
                String instantToClean = this.cache.keySet().iterator().next();
                this.cache.remove(instantToClean).close();
            }
            ExternalSpillableMap<String, byte[]> images = this.loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
            this.cache.put(instant, images);
            return images;
        }

        private ExternalSpillableMap<String, byte[]> loadImageRecords(long maxCompactionMemoryInBytes, FileSlice fileSlice) throws IOException {
            MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(this.writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
            ClosableIterator<RowData> itr = this.splitIteratorFunc.apply(inputSplit);
            ExternalSpillableMap<String, byte[]> imageRecordsMap = FormatUtils.spillableMap(this.writeConfig, maxCompactionMemoryInBytes);
            while (itr.hasNext()) {
                RowData row = (RowData)itr.next();
                String recordKey = row.getString(2).toString();
                ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
                this.serializer.serialize(row, (DataOutputView)new BytesArrayOutputView(baos));
                imageRecordsMap.put(recordKey, baos.toByteArray());
            }
            itr.close();
            return imageRecordsMap;
        }

        public RowData getImageRecord(String recordKey, ExternalSpillableMap<String, byte[]> cache, RowKind rowKind) {
            byte[] bytes = (byte[])cache.get(recordKey);
            ValidationUtils.checkState(bytes != null, "Key " + recordKey + " does not exist in current file group");
            try {
                RowData row = this.serializer.deserialize((DataInputView)new BytesArrayInputView(bytes));
                row.setRowKind(rowKind);
                return row;
            }
            catch (IOException e) {
                throw new HoodieException("Deserialize bytes into row data exception", e);
            }
        }

        public void updateImageRecord(String recordKey, ExternalSpillableMap<String, byte[]> cache, RowData row) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
            try {
                this.serializer.serialize(row, (DataOutputView)new BytesArrayOutputView(baos));
            }
            catch (IOException e) {
                throw new HoodieException("Serialize row data into bytes exception", e);
            }
            cache.put(recordKey, baos.toByteArray());
        }

        public RowData removeImageRecord(String recordKey, ExternalSpillableMap<String, byte[]> cache) {
            byte[] bytes = (byte[])cache.remove(recordKey);
            if (bytes == null) {
                return null;
            }
            try {
                return this.serializer.deserialize((DataInputView)new BytesArrayInputView(bytes));
            }
            catch (IOException e) {
                throw new HoodieException("Deserialize bytes into row data exception", e);
            }
        }

        @Override
        public void close() {
            this.cache.values().forEach(ExternalSpillableMap::close);
            this.cache.clear();
        }
    }

    public static final class BytesArrayOutputView
    extends DataOutputStream
    implements DataOutputView {
        public BytesArrayOutputView(ByteArrayOutputStream baos) {
            super(baos);
        }

        public void skipBytesToWrite(int numBytes) throws IOException {
            for (int i = 0; i < numBytes; ++i) {
                this.write(0);
            }
        }

        public void write(DataInputView source, int numBytes) throws IOException {
            byte[] buffer = new byte[numBytes];
            source.readFully(buffer);
            this.write(buffer);
        }
    }

    public static final class BytesArrayInputView
    extends DataInputStream
    implements DataInputView {
        public BytesArrayInputView(byte[] data) {
            super(new ByteArrayInputStream(data));
        }

        public void skipBytesToRead(int numBytes) throws IOException {
            while (numBytes > 0) {
                int skipped = this.skipBytes(numBytes);
                numBytes -= skipped;
            }
        }
    }

    static class ReplaceCommitIterator
    implements ClosableIterator<RowData> {
        private final ClosableIterator<RowData> itr;
        private final RowDataProjection projection;

        ReplaceCommitIterator(Configuration flinkConf, String tablePath, MergeOnReadTableState tableState, HoodieCDCFileSplit fileSplit, Function<MergeOnReadInputSplit, ClosableIterator<RowData>> splitIteratorFunc) {
            this.itr = this.initIterator(tablePath, StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), fileSplit, splitIteratorFunc);
            this.projection = RowDataProjection.instance(tableState.getRequiredRowType(), tableState.getRequiredPositions());
        }

        private ClosableIterator<RowData> initIterator(String tablePath, long maxCompactionMemoryInBytes, HoodieCDCFileSplit fileSplit, Function<MergeOnReadInputSplit, ClosableIterator<RowData>> splitIteratorFunc) {
            ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(), "Before file slice does not exist for instant: " + fileSplit.getInstant());
            MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(tablePath, fileSplit.getBeforeFileSlice().get(), maxCompactionMemoryInBytes);
            return splitIteratorFunc.apply(inputSplit);
        }

        @Override
        public boolean hasNext() {
            return this.itr.hasNext();
        }

        @Override
        public RowData next() {
            RowData row = (RowData)this.itr.next();
            row.setRowKind(RowKind.DELETE);
            return this.projection.project(row);
        }

        @Override
        public void close() {
            this.itr.close();
        }
    }

    static class RecordKeyImageIterator
    extends BeforeImageIterator {
        protected ExternalSpillableMap<String, byte[]> beforeImages;

        RecordKeyImageIterator(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf, String tablePath, MergeOnReadTableState tableState, Schema cdcSchema, HoodieCDCFileSplit fileSplit, ImageManager imageManager) throws IOException {
            super(flinkConf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
        }

        @Override
        protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
            super.initImages(fileSplit);
            ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(), "Before file slice does not exist for instant: " + fileSplit.getInstant());
            this.beforeImages = this.imageManager.getOrLoadImages(this.maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
        }

        @Override
        protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
            String recordKey = cdcRecord.get(1).toString();
            RowData row = this.imageManager.getImageRecord(recordKey, this.beforeImages, rowKind);
            row.setRowKind(rowKind);
            return this.projection.project(row);
        }
    }

    static class BeforeImageIterator
    extends BaseImageIterator {
        protected ExternalSpillableMap<String, byte[]> afterImages;
        protected final long maxCompactionMemoryInBytes;
        protected final RowDataProjection projection;
        protected final ImageManager imageManager;

        BeforeImageIterator(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf, String tablePath, MergeOnReadTableState tableState, Schema cdcSchema, HoodieCDCFileSplit fileSplit, ImageManager imageManager) throws IOException {
            super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
            this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf);
            this.projection = RowDataProjection.instance(tableState.getRequiredRowType(), tableState.getRequiredPositions());
            this.imageManager = imageManager;
            this.initImages(fileSplit);
        }

        protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
            ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(), "Current file slice does not exist for instant: " + fileSplit.getInstant());
            this.afterImages = this.imageManager.getOrLoadImages(this.maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
        }

        @Override
        protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
            String recordKey = cdcRecord.get(1).toString();
            RowData row = this.imageManager.getImageRecord(recordKey, this.afterImages, rowKind);
            row.setRowKind(rowKind);
            return this.projection.project(row);
        }

        @Override
        protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
            return this.resolveAvro(rowKind, (GenericRecord)cdcRecord.get(2));
        }
    }

    static class BeforeAfterImageIterator
    extends BaseImageIterator {
        BeforeAfterImageIterator(String tablePath, MergeOnReadTableState tableState, org.apache.hadoop.conf.Configuration hadoopConf, Schema cdcSchema, HoodieCDCFileSplit fileSplit) {
            super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
        }

        @Override
        protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
            return this.resolveAvro(rowKind, (GenericRecord)cdcRecord.get(3));
        }

        @Override
        protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
            return this.resolveAvro(rowKind, (GenericRecord)cdcRecord.get(2));
        }
    }

    static abstract class BaseImageIterator
    implements ClosableIterator<RowData> {
        private final Schema requiredSchema;
        private final int[] requiredPos;
        private final GenericRecordBuilder recordBuilder;
        private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
        private HoodieCDCLogRecordIterator cdcItr;
        private GenericRecord cdcRecord;
        private RowData sideImage;
        private RowData currentImage;

        BaseImageIterator(org.apache.hadoop.conf.Configuration hadoopConf, String tablePath, MergeOnReadTableState tableState, Schema cdcSchema, HoodieCDCFileSplit fileSplit) {
            this.requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
            this.requiredPos = this.getRequiredPos(tableState.getAvroSchema(), this.requiredSchema);
            this.recordBuilder = new GenericRecordBuilder(this.requiredSchema);
            this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
            StoragePath hadoopTablePath = new StoragePath(tablePath);
            HoodieHadoopStorage storage = new HoodieHadoopStorage(tablePath, hadoopConf);
            HoodieLogFile[] cdcLogFiles = (HoodieLogFile[])fileSplit.getCdcFiles().stream().map(cdcFile -> {
                try {
                    return new HoodieLogFile(storage.getPathInfo(new StoragePath(hadoopTablePath, (String)cdcFile)));
                }
                catch (IOException e) {
                    throw new HoodieIOException("Fail to call getFileStatus", e);
                }
            }).toArray(HoodieLogFile[]::new);
            this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, cdcSchema);
        }

        private int[] getRequiredPos(String tableSchema, Schema required) {
            Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new Schema.Parser().parse(tableSchema));
            List fields = dataSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
            return required.getFields().stream().map(f -> fields.indexOf(f.name())).mapToInt(i -> i).toArray();
        }

        @Override
        public boolean hasNext() {
            if (this.sideImage != null) {
                this.currentImage = this.sideImage;
                this.sideImage = null;
                return true;
            }
            if (this.cdcItr.hasNext()) {
                this.cdcRecord = (GenericRecord)this.cdcItr.next();
                String op = String.valueOf(this.cdcRecord.get(0));
                this.resolveImage(op);
                return true;
            }
            return false;
        }

        protected abstract RowData getAfterImage(RowKind var1, GenericRecord var2);

        protected abstract RowData getBeforeImage(RowKind var1, GenericRecord var2);

        @Override
        public RowData next() {
            return this.currentImage;
        }

        @Override
        public void close() {
            if (this.cdcItr != null) {
                this.cdcItr.close();
                this.cdcItr = null;
            }
        }

        private void resolveImage(String op) {
            switch (op) {
                case "i": {
                    this.currentImage = this.getAfterImage(RowKind.INSERT, this.cdcRecord);
                    break;
                }
                case "u": {
                    this.currentImage = this.getBeforeImage(RowKind.UPDATE_BEFORE, this.cdcRecord);
                    this.sideImage = this.getAfterImage(RowKind.UPDATE_AFTER, this.cdcRecord);
                    break;
                }
                case "d": {
                    this.currentImage = this.getBeforeImage(RowKind.DELETE, this.cdcRecord);
                    break;
                }
                default: {
                    throw new AssertionError((Object)"Unexpected");
                }
            }
        }

        protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
            GenericRecord requiredAvroRecord = FormatUtils.buildAvroRecordBySchema(avroRecord, this.requiredSchema, this.requiredPos, this.recordBuilder);
            RowData resolved = (RowData)this.avroToRowDataConverter.convert(requiredAvroRecord);
            resolved.setRowKind(rowKind);
            return resolved;
        }
    }

    static class DataLogFileIterator
    implements ClosableIterator<RowData> {
        private final Schema tableSchema;
        private final long maxCompactionMemoryInBytes;
        private final ImageManager imageManager;
        private final HoodieMergedLogRecordScanner scanner;
        private final Iterator<String> logRecordsKeyIterator;
        private final RowDataProjection projection;
        private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
        private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter;
        private final HoodieRecordMerger recordMerger;
        private final TypedProperties payloadProps;
        private ExternalSpillableMap<String, byte[]> beforeImages;
        private RowData currentImage;
        private RowData sideImage;

        DataLogFileIterator(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf, InternalSchemaManager schemaManager, long maxCompactionMemoryInBytes, ImageManager imageManager, HoodieCDCFileSplit cdcFileSplit, MergeOnReadInputSplit split, MergeOnReadTableState tableState) throws IOException {
            this.tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
            this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
            this.imageManager = imageManager;
            this.scanner = FormatUtils.logScanner(split, this.tableSchema, schemaManager.getQuerySchema(), flinkConf, hadoopConf);
            this.logRecordsKeyIterator = this.scanner.getRecords().keySet().iterator();
            this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRowType(), flinkConf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
            this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter((LogicalType)tableState.getRowType(), flinkConf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
            this.projection = tableState.getRequiredRowType().equals((Object)tableState.getRowType()) ? null : RowDataProjection.instance(tableState.getRequiredRowType(), tableState.getRequiredPositions());
            List<String> mergers = Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(",")).map(String::trim).distinct().collect(Collectors.toList());
            this.recordMerger = HoodieRecordUtils.createRecordMerger(split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY_ID));
            this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
            this.initImages(cdcFileSplit);
        }

        private void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
            this.beforeImages = fileSplit.getBeforeFileSlice().isPresent() && !fileSplit.getBeforeFileSlice().get().isEmpty() ? this.imageManager.getOrLoadImages(this.maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get()) : FormatUtils.spillableMap(this.imageManager.writeConfig, this.maxCompactionMemoryInBytes);
        }

        @Override
        public boolean hasNext() {
            if (this.sideImage != null) {
                this.currentImage = this.sideImage;
                this.sideImage = null;
                return true;
            }
            while (this.logRecordsKeyIterator.hasNext()) {
                String recordKey = this.logRecordsKeyIterator.next();
                HoodieAvroRecord record = (HoodieAvroRecord)this.scanner.getRecords().get(recordKey);
                Option val = CdcInputFormat.getInsertVal(record, this.tableSchema);
                RowData existed = this.imageManager.removeImageRecord(record.getRecordKey(), this.beforeImages);
                if (val.isEmpty()) {
                    if (existed == null) continue;
                    existed.setRowKind(RowKind.DELETE);
                    this.currentImage = existed;
                    return true;
                }
                IndexedRecord newAvroVal = (IndexedRecord)val.get();
                if (existed == null) {
                    RowData newRow = (RowData)this.avroToRowDataConverter.convert(newAvroVal);
                    newRow.setRowKind(RowKind.INSERT);
                    this.currentImage = newRow;
                    return true;
                }
                GenericRecord historyAvroRecord = (GenericRecord)this.rowDataToAvroConverter.convert(this.tableSchema, existed);
                HoodieRecord<IndexedRecord> merged = this.mergeRowWithLog(historyAvroRecord, record).get();
                if (merged.getData() == historyAvroRecord) continue;
                existed.setRowKind(RowKind.UPDATE_BEFORE);
                this.currentImage = existed;
                RowData mergedRow = (RowData)this.avroToRowDataConverter.convert(merged.getData());
                mergedRow.setRowKind(RowKind.UPDATE_AFTER);
                this.imageManager.updateImageRecord(record.getRecordKey(), this.beforeImages, mergedRow);
                this.sideImage = mergedRow;
                return true;
            }
            return false;
        }

        @Override
        public RowData next() {
            return this.projection != null ? this.projection.project(this.currentImage) : this.currentImage;
        }

        @Override
        public void close() {
            this.scanner.close();
            this.imageManager.close();
        }

        private Option<HoodieRecord<IndexedRecord>> mergeRowWithLog(GenericRecord historyAvroRecord, HoodieRecord<?> newRecord) {
            HoodieAvroIndexedRecord historyAvroIndexedRecord = new HoodieAvroIndexedRecord(historyAvroRecord);
            try {
                return this.recordMerger.merge(historyAvroIndexedRecord, this.tableSchema, newRecord, this.tableSchema, this.payloadProps).map(Pair::getLeft);
            }
            catch (IOException e) {
                throw new HoodieIOException("Merge base and delta payloads exception", e);
            }
        }
    }

    static class RemoveBaseFileIterator
    implements ClosableIterator<RowData> {
        private ClosableIterator<RowData> nested;
        private final RowDataProjection projection;

        RemoveBaseFileIterator(MergeOnReadTableState tableState, ClosableIterator<RowData> iterator2) {
            this.nested = iterator2;
            this.projection = RowDataProjection.instance(tableState.getRequiredRowType(), tableState.getRequiredPositions());
        }

        @Override
        public boolean hasNext() {
            return this.nested.hasNext();
        }

        @Override
        public RowData next() {
            RowData row = (RowData)this.nested.next();
            row.setRowKind(RowKind.DELETE);
            return this.projection.project(row);
        }

        @Override
        public void close() {
            if (this.nested != null) {
                this.nested.close();
                this.nested = null;
            }
        }
    }

    static class AddBaseFileIterator
    implements ClosableIterator<RowData> {
        private ClosableIterator<RowData> nested;
        private RowData currentRecord;

        AddBaseFileIterator(ClosableIterator<RowData> nested) {
            this.nested = nested;
        }

        @Override
        public boolean hasNext() {
            if (this.nested.hasNext()) {
                this.currentRecord = (RowData)this.nested.next();
                this.currentRecord.setRowKind(RowKind.INSERT);
                return true;
            }
            return false;
        }

        @Override
        public RowData next() {
            return this.currentRecord;
        }

        @Override
        public void close() {
            if (this.nested != null) {
                this.nested.close();
                this.nested = null;
            }
        }
    }

    static class CdcFileSplitsIterator
    implements ClosableIterator<RowData> {
        private ImageManager imageManager;
        private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
        private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc;
        private ClosableIterator<RowData> recordIterator;

        CdcFileSplitsIterator(CdcInputSplit inputSplit, ImageManager imageManager, Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc) {
            this.fileSplitIterator = Arrays.asList(inputSplit.getChanges()).iterator();
            this.imageManager = imageManager;
            this.recordIteratorFunc = recordIteratorFunc;
        }

        @Override
        public boolean hasNext() {
            if (this.recordIterator != null) {
                if (this.recordIterator.hasNext()) {
                    return true;
                }
                this.recordIterator.close();
                this.recordIterator = null;
            }
            if (this.fileSplitIterator.hasNext()) {
                HoodieCDCFileSplit fileSplit = this.fileSplitIterator.next();
                this.recordIterator = this.recordIteratorFunc.apply(fileSplit);
                return this.recordIterator.hasNext();
            }
            return false;
        }

        @Override
        public RowData next() {
            return (RowData)this.recordIterator.next();
        }

        @Override
        public void close() {
            if (this.recordIterator != null) {
                this.recordIterator.close();
            }
            if (this.imageManager != null) {
                this.imageManager.close();
                this.imageManager = null;
            }
        }
    }
}

