/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.format.mor;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.StringToRowDataConverter;

public class MergeOnReadInputFormat
extends RichInputFormat<RowData, MergeOnReadInputSplit> {
    private static final long serialVersionUID = 1L;
    private final Configuration conf;
    private transient org.apache.hadoop.conf.Configuration hadoopConf;
    private final MergeOnReadTableState tableState;
    private transient RecordIterator iterator;
    private final List<String> fieldNames;
    private final List<DataType> fieldTypes;
    private final String defaultPartName;
    private final int[] requiredPos;
    private final long limit;
    private long currentReadCount = 0L;
    private boolean emitDelete;
    private boolean closed = true;

    private MergeOnReadInputFormat(Configuration conf, MergeOnReadTableState tableState, List<DataType> fieldTypes, String defaultPartName, long limit, boolean emitDelete) {
        this.conf = conf;
        this.tableState = tableState;
        this.fieldNames = tableState.getRowType().getFieldNames();
        this.fieldTypes = fieldTypes;
        this.defaultPartName = defaultPartName;
        this.requiredPos = tableState.getRequiredPositions();
        this.limit = limit;
        this.emitDelete = emitDelete;
    }

    public static Builder builder() {
        return new Builder();
    }

    public void open(MergeOnReadInputSplit split) throws IOException {
        this.currentReadCount = 0L;
        this.closed = false;
        this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
        if (!split.getLogPaths().isPresent() || split.getLogPaths().get().size() <= 0) {
            this.iterator = split.getInstantRange() != null ? new BaseFileOnlyFilteringIterator(split.getInstantRange(), this.tableState.getRequiredRowType(), this.getReader(split.getBasePath().get(), MergeOnReadInputFormat.getRequiredPosWithCommitTime(this.requiredPos))) : new BaseFileOnlyIterator(this.getRequiredSchemaReader(split.getBasePath().get()));
        } else if (!split.getBasePath().isPresent()) {
            this.iterator = OptionsResolver.emitChangelog(this.conf) ? new LogFileOnlyIterator(this.getUnMergedLogFileIterator(split)) : new LogFileOnlyIterator(this.getLogFileIterator(split));
        } else if (split.getMergeType().equals("skip_merge")) {
            this.iterator = new SkipMergeIterator(this.getRequiredSchemaReader(split.getBasePath().get()), this.getLogFileIterator(split));
        } else if (split.getMergeType().equals("payload_combine")) {
            this.iterator = new MergeIterator(this.conf, this.hadoopConf, split, this.tableState.getRowType(), this.tableState.getRequiredRowType(), new Schema.Parser().parse(this.tableState.getAvroSchema()), new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), this.requiredPos, this.emitDelete, this.tableState.getOperationPos(), this.getFullSchemaReader(split.getBasePath().get()));
        } else {
            throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for file path: " + split.getBasePath() + "log paths: " + split.getLogPaths() + "hoodie table path: " + split.getTablePath() + "spark partition Index: " + split.getSplitNumber() + "merge type: " + split.getMergeType());
        }
        this.mayShiftInputSplit(split);
    }

    public void configure(Configuration configuration) {
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
        return null;
    }

    public MergeOnReadInputSplit[] createInputSplits(int minNumSplits) {
        return this.tableState.getInputSplits().toArray(new MergeOnReadInputSplit[0]);
    }

    public InputSplitAssigner getInputSplitAssigner(MergeOnReadInputSplit[] mergeOnReadInputSplits) {
        return new DefaultInputSplitAssigner((InputSplit[])mergeOnReadInputSplits);
    }

    public boolean reachedEnd() throws IOException {
        if (this.limit > 0L && this.currentReadCount >= this.limit) {
            return true;
        }
        return this.iterator.reachedEnd();
    }

    public RowData nextRecord(RowData o) {
        ++this.currentReadCount;
        return this.iterator.nextRecord();
    }

    public void close() throws IOException {
        if (this.iterator != null) {
            this.iterator.close();
        }
        this.iterator = null;
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    private void mayShiftInputSplit(MergeOnReadInputSplit split) throws IOException {
        if (split.isConsumed()) {
            for (long i = 0L; i < split.getConsumed() && !this.reachedEnd(); ++i) {
                this.nextRecord(null);
            }
        }
    }

    private ParquetColumnarRowSplitReader getFullSchemaReader(String path) throws IOException {
        return this.getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray());
    }

    private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) throws IOException {
        return this.getReader(path, this.requiredPos);
    }

    private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) throws IOException {
        LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues(new org.apache.hadoop.fs.Path(path).getParent(), this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), FilePathUtils.extractPartitionKeys(this.conf));
        LinkedHashMap<String, Object> partObjects = new LinkedHashMap<String, Object>();
        partSpec.forEach((k, v) -> {
            int idx = this.fieldNames.indexOf(k);
            if (idx == -1) {
                return;
            }
            DataType fieldType = this.fieldTypes.get(idx);
            if (!DataTypeUtils.isDatetimeType(fieldType)) {
                partObjects.put((String)k, DataTypeUtils.resolvePartition(this.defaultPartName.equals(v) ? null : v, fieldType));
            }
        });
        return ParquetSplitReaderUtil.genPartColumnarRowReader(this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), true, HadoopConfigurations.getParquetConf(this.conf, this.hadoopConf), this.fieldNames.toArray(new String[0]), this.fieldTypes.toArray(new DataType[0]), partObjects, requiredPos, 2048, new Path(path), 0L, Long.MAX_VALUE);
    }

    private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
        final Schema tableSchema = new Schema.Parser().parse(this.tableState.getAvroSchema());
        final Schema requiredSchema = new Schema.Parser().parse(this.tableState.getRequiredAvroSchema());
        final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
        final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(this.tableState.getRequiredRowType());
        final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, this.conf, this.hadoopConf);
        final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
        final int[] pkOffset = this.tableState.getPkOffsetsInRequired();
        final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
        LogicalType[] pkTypes = pkSemanticLost ? null : this.tableState.getPkTypes(pkOffset);
        final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
        return new ClosableIterator<RowData>(){
            private RowData currentRecord;

            @Override
            public boolean hasNext() {
                while (logRecordsKeyIterator.hasNext()) {
                    String curAvroKey = (String)logRecordsKeyIterator.next();
                    Option<IndexedRecord> curAvroRecord = null;
                    HoodieAvroRecord hoodieRecord = (HoodieAvroRecord)scanner.getRecords().get(curAvroKey);
                    try {
                        curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
                    }
                    catch (IOException e) {
                        throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
                    }
                    if (!curAvroRecord.isPresent()) {
                        if (!MergeOnReadInputFormat.this.emitDelete || pkSemanticLost) continue;
                        GenericRowData delete = new GenericRowData(MergeOnReadInputFormat.this.tableState.getRequiredRowType().getFieldCount());
                        String recordKey = hoodieRecord.getRecordKey();
                        String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
                        Object[] converted = converter.convert(pkFields);
                        for (int i = 0; i < pkOffset.length; ++i) {
                            delete.setField(pkOffset[i], converted[i]);
                        }
                        delete.setRowKind(RowKind.DELETE);
                        this.currentRecord = delete;
                        return true;
                    }
                    IndexedRecord avroRecord = curAvroRecord.get();
                    RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, MergeOnReadInputFormat.this.tableState.getOperationPos());
                    if (rowKind == RowKind.DELETE && !MergeOnReadInputFormat.this.emitDelete) continue;
                    GenericRecord requiredAvroRecord = FormatUtils.buildAvroRecordBySchema(avroRecord, requiredSchema, MergeOnReadInputFormat.this.requiredPos, recordBuilder);
                    this.currentRecord = (RowData)avroToRowDataConverter.convert(requiredAvroRecord);
                    this.currentRecord.setRowKind(rowKind);
                    return true;
                }
                return false;
            }

            @Override
            public RowData next() {
                return this.currentRecord;
            }

            @Override
            public void close() {
                scanner.close();
            }
        };
    }

    private ClosableIterator<RowData> getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
        final Schema tableSchema = new Schema.Parser().parse(this.tableState.getAvroSchema());
        final Schema requiredSchema = new Schema.Parser().parse(this.tableState.getRequiredAvroSchema());
        final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
        final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(this.tableState.getRequiredRowType());
        final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, this.hadoopConf, this.conf);
        final Iterator<HoodieRecord<?>> recordsIterator = records.getRecordsIterator();
        return new ClosableIterator<RowData>(){
            private RowData currentRecord;

            @Override
            public boolean hasNext() {
                while (recordsIterator.hasNext()) {
                    Option<IndexedRecord> curAvroRecord = null;
                    HoodieAvroRecord hoodieRecord = (HoodieAvroRecord)recordsIterator.next();
                    try {
                        curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
                    }
                    catch (IOException e) {
                        throw new HoodieException("Get avro insert value error for key: " + hoodieRecord.getRecordKey(), e);
                    }
                    if (!curAvroRecord.isPresent()) continue;
                    IndexedRecord avroRecord = curAvroRecord.get();
                    GenericRecord requiredAvroRecord = FormatUtils.buildAvroRecordBySchema(avroRecord, requiredSchema, MergeOnReadInputFormat.this.requiredPos, recordBuilder);
                    this.currentRecord = (RowData)avroToRowDataConverter.convert(requiredAvroRecord);
                    FormatUtils.setRowKind(this.currentRecord, avroRecord, MergeOnReadInputFormat.this.tableState.getOperationPos());
                    return true;
                }
                return false;
            }

            @Override
            public RowData next() {
                return this.currentRecord;
            }

            @Override
            public void close() {
                records.close();
            }
        };
    }

    private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
        int[] requiredPos2 = new int[requiredPos.length + 1];
        requiredPos2[0] = 0;
        System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
        return requiredPos2;
    }

    @VisibleForTesting
    public void isEmitDelete(boolean emitDelete) {
        this.emitDelete = emitDelete;
    }

    public static class Builder {
        private Configuration conf;
        private MergeOnReadTableState tableState;
        private List<DataType> fieldTypes;
        private String defaultPartName;
        private long limit = -1L;
        private boolean emitDelete = false;

        public Builder config(Configuration conf) {
            this.conf = conf;
            return this;
        }

        public Builder tableState(MergeOnReadTableState tableState) {
            this.tableState = tableState;
            return this;
        }

        public Builder fieldTypes(List<DataType> fieldTypes) {
            this.fieldTypes = fieldTypes;
            return this;
        }

        public Builder defaultPartName(String defaultPartName) {
            this.defaultPartName = defaultPartName;
            return this;
        }

        public Builder limit(long limit) {
            this.limit = limit;
            return this;
        }

        public Builder emitDelete(boolean emitDelete) {
            this.emitDelete = emitDelete;
            return this;
        }

        public MergeOnReadInputFormat build() {
            return new MergeOnReadInputFormat(this.conf, this.tableState, this.fieldTypes, this.defaultPartName, this.limit, this.emitDelete);
        }
    }

    static class MergeIterator
    implements RecordIterator {
        private final ParquetColumnarRowSplitReader reader;
        private final Iterator<String> logKeysIterator;
        private final HoodieMergedLogRecordScanner scanner;
        private final Schema tableSchema;
        private final Schema requiredSchema;
        private final int[] requiredPos;
        private final boolean emitDelete;
        private final int operationPos;
        private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter;
        private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
        private final GenericRecordBuilder recordBuilder;
        private final RowDataProjection projection;
        private final InstantRange instantRange;
        private boolean readLogs = false;
        private final Set<String> keyToSkip = new HashSet<String>();
        private final Properties payloadProps;
        private RowData currentRecord;

        MergeIterator(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf, MergeOnReadInputSplit split, RowType tableRowType, RowType requiredRowType, Schema tableSchema, Schema requiredSchema, int[] requiredPos, boolean emitDelete, int operationPos, ParquetColumnarRowSplitReader reader) {
            this.tableSchema = tableSchema;
            this.reader = reader;
            this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, hadoopConf);
            this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
            this.logKeysIterator = this.scanner.getRecords().keySet().iterator();
            this.requiredSchema = requiredSchema;
            this.requiredPos = requiredPos;
            this.emitDelete = emitDelete;
            this.operationPos = operationPos;
            this.recordBuilder = new GenericRecordBuilder(requiredSchema);
            this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter((LogicalType)tableRowType);
            this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
            this.projection = RowDataProjection.instance(requiredRowType, requiredPos);
            this.instantRange = split.getInstantRange().orElse(null);
        }

        @Override
        public boolean reachedEnd() throws IOException {
            while (!this.readLogs && !this.reader.reachedEnd()) {
                boolean isInRange;
                this.currentRecord = this.reader.nextRecord();
                if (this.instantRange != null && !(isInRange = this.instantRange.isInRange(this.currentRecord.getString(0).toString()))) continue;
                String curKey = this.currentRecord.getString(2).toString();
                if (this.scanner.getRecords().containsKey(curKey)) {
                    this.keyToSkip.add(curKey);
                    Option<IndexedRecord> mergedAvroRecord = this.mergeRowWithLog(this.currentRecord, curKey);
                    if (!mergedAvroRecord.isPresent()) continue;
                    RowKind rowKind = FormatUtils.getRowKindSafely(mergedAvroRecord.get(), this.operationPos);
                    if (!this.emitDelete && rowKind == RowKind.DELETE) continue;
                    GenericRecord avroRecord = FormatUtils.buildAvroRecordBySchema(mergedAvroRecord.get(), this.requiredSchema, this.requiredPos, this.recordBuilder);
                    this.currentRecord = (RowData)this.avroToRowDataConverter.convert(avroRecord);
                    this.currentRecord.setRowKind(rowKind);
                    return false;
                }
                this.currentRecord = this.projection.project(this.currentRecord);
                return false;
            }
            this.readLogs = true;
            while (this.logKeysIterator.hasNext()) {
                Option<IndexedRecord> insertAvroRecord;
                String curKey = this.logKeysIterator.next();
                if (this.keyToSkip.contains(curKey) || !(insertAvroRecord = this.getInsertValue(curKey)).isPresent()) continue;
                GenericRecord avroRecord = FormatUtils.buildAvroRecordBySchema(insertAvroRecord.get(), this.requiredSchema, this.requiredPos, this.recordBuilder);
                this.currentRecord = (RowData)this.avroToRowDataConverter.convert(avroRecord);
                FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), this.operationPos);
                return false;
            }
            return true;
        }

        private Option<IndexedRecord> getInsertValue(String curKey) throws IOException {
            HoodieAvroRecord record = (HoodieAvroRecord)this.scanner.getRecords().get(curKey);
            if (!this.emitDelete && HoodieOperation.isDelete(record.getOperation())) {
                return Option.empty();
            }
            return record.getData().getInsertValue(this.tableSchema);
        }

        @Override
        public RowData nextRecord() {
            return this.currentRecord;
        }

        @Override
        public void close() throws IOException {
            if (this.reader != null) {
                this.reader.close();
            }
            if (this.scanner != null) {
                this.scanner.close();
            }
        }

        private Option<IndexedRecord> mergeRowWithLog(RowData curRow, String curKey) throws IOException {
            HoodieAvroRecord record = (HoodieAvroRecord)this.scanner.getRecords().get(curKey);
            GenericRecord historyAvroRecord = (GenericRecord)this.rowDataToAvroConverter.convert(this.tableSchema, curRow);
            return record.getData().combineAndGetUpdateValue(historyAvroRecord, this.tableSchema, this.payloadProps);
        }
    }

    static class SkipMergeIterator
    implements RecordIterator {
        private final ParquetColumnarRowSplitReader reader;
        private final ClosableIterator<RowData> iterator;
        private boolean readLogs = false;
        private RowData currentRecord;

        SkipMergeIterator(ParquetColumnarRowSplitReader reader, ClosableIterator<RowData> iterator2) {
            this.reader = reader;
            this.iterator = iterator2;
        }

        @Override
        public boolean reachedEnd() throws IOException {
            if (!this.readLogs && !this.reader.reachedEnd()) {
                this.currentRecord = this.reader.nextRecord();
                return false;
            }
            this.readLogs = true;
            if (this.iterator.hasNext()) {
                this.currentRecord = (RowData)this.iterator.next();
                return false;
            }
            return true;
        }

        @Override
        public RowData nextRecord() {
            return this.currentRecord;
        }

        @Override
        public void close() throws IOException {
            if (this.reader != null) {
                this.reader.close();
            }
            if (this.iterator != null) {
                this.iterator.close();
            }
        }
    }

    static class LogFileOnlyIterator
    implements RecordIterator {
        private final ClosableIterator<RowData> iterator;

        LogFileOnlyIterator(ClosableIterator<RowData> iterator2) {
            this.iterator = iterator2;
        }

        @Override
        public boolean reachedEnd() {
            return !this.iterator.hasNext();
        }

        @Override
        public RowData nextRecord() {
            return (RowData)this.iterator.next();
        }

        @Override
        public void close() {
            if (this.iterator != null) {
                this.iterator.close();
            }
        }
    }

    static class BaseFileOnlyFilteringIterator
    implements RecordIterator {
        private final ParquetColumnarRowSplitReader reader;
        private final InstantRange instantRange;
        private final RowDataProjection projection;
        private RowData currentRecord;

        BaseFileOnlyFilteringIterator(Option<InstantRange> instantRange, RowType requiredRowType, ParquetColumnarRowSplitReader reader) {
            this.reader = reader;
            this.instantRange = instantRange.orElse(null);
            int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray();
            this.projection = RowDataProjection.instance(requiredRowType, positions);
        }

        @Override
        public boolean reachedEnd() throws IOException {
            while (!this.reader.reachedEnd()) {
                this.currentRecord = this.reader.nextRecord();
                if (this.instantRange != null) {
                    boolean isInRange = this.instantRange.isInRange(this.currentRecord.getString(0).toString());
                    if (!isInRange) continue;
                    return false;
                }
                return false;
            }
            return true;
        }

        @Override
        public RowData nextRecord() {
            return this.projection.project(this.currentRecord);
        }

        @Override
        public void close() throws IOException {
            if (this.reader != null) {
                this.reader.close();
            }
        }
    }

    static class BaseFileOnlyIterator
    implements RecordIterator {
        private final ParquetColumnarRowSplitReader reader;

        BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) {
            this.reader = reader;
        }

        @Override
        public boolean reachedEnd() throws IOException {
            return this.reader.reachedEnd();
        }

        @Override
        public RowData nextRecord() {
            return this.reader.nextRecord();
        }

        @Override
        public void close() throws IOException {
            if (this.reader != null) {
                this.reader.close();
            }
        }
    }

    private static interface RecordIterator {
        public boolean reachedEnd() throws IOException;

        public RowData nextRecord();

        public void close() throws IOException;
    }
}

