/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.format.parquet;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.MapColumnVector;
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.ParquetInputFile;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;
import org.apache.paimon.format.parquet.newreader.VectorizedParquetRecordReader;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
import org.apache.paimon.format.parquet.reader.ParquetReadState;
import org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil;
import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.shade.org.apache.parquet.ParquetReadOptions;
import org.apache.paimon.shade.org.apache.parquet.column.ColumnDescriptor;
import org.apache.paimon.shade.org.apache.parquet.column.page.PageReadStore;
import org.apache.paimon.shade.org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.paimon.shade.org.apache.parquet.io.ColumnIOFactory;
import org.apache.paimon.shade.org.apache.parquet.io.MessageColumnIO;
import org.apache.paimon.shade.org.apache.parquet.schema.ConversionPatterns;
import org.apache.paimon.shade.org.apache.parquet.schema.GroupType;
import org.apache.paimon.shade.org.apache.parquet.schema.MessageType;
import org.apache.paimon.shade.org.apache.parquet.schema.Type;
import org.apache.paimon.shade.org.apache.parquet.schema.Types;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParquetReaderFactory
implements FormatReaderFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ParquetReaderFactory.class);
    private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";
    private final Options conf;
    private final RowType projectedType;
    private final String[] projectedColumnNames;
    private final DataField[] projectedFields;
    private final int batchSize;
    private final FilterCompat.Filter filter;
    private final Set<Integer> unknownFieldsIndices = new HashSet<Integer>();

    public ParquetReaderFactory(Options conf, RowType projectedType, int batchSize, FilterCompat.Filter filter) {
        this.conf = conf;
        this.projectedType = projectedType;
        this.projectedColumnNames = projectedType.getFieldNames().toArray(new String[0]);
        this.projectedFields = projectedType.getFields().toArray(new DataField[0]);
        this.batchSize = batchSize;
        this.filter = filter;
    }

    public FileRecordReader<InternalRow> createReaderOld(FormatReaderFactory.Context context) throws IOException {
        ParquetReadOptions.Builder builder = ParquetReadOptions.builder().withRange(0L, context.fileSize());
        this.setReadOptions(builder);
        ParquetFileReader reader = new ParquetFileReader(ParquetInputFile.fromPath(context.fileIO(), context.filePath()), builder.build(), context.fileIndex());
        MessageType fileSchema = reader.getFileMetaData().getSchema();
        MessageType requestedSchema = this.clipParquetSchema(fileSchema);
        reader.setRequestedSchema(requestedSchema);
        this.checkSchema(fileSchema, requestedSchema);
        Pool<ParquetReaderBatch> poolOfBatches = this.createPoolOfBatches(context.filePath(), requestedSchema);
        MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(requestedSchema);
        List<ParquetField> fields = ParquetSplitReaderUtil.buildFieldsList(this.projectedType.getFields(), this.projectedType.getFieldNames(), columnIO);
        return new ParquetReader(reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields);
    }

    @Override
    public FileRecordReader<InternalRow> createReader(FormatReaderFactory.Context context) throws IOException {
        if (Boolean.parseBoolean(this.conf.getString("parquet.use-old-reader", "false"))) {
            return this.createReaderOld(context);
        }
        ParquetReadOptions.Builder builder = ParquetReadOptions.builder().withRange(0L, context.fileSize());
        this.setReadOptions(builder);
        ParquetFileReader reader = new ParquetFileReader(ParquetInputFile.fromPath(context.fileIO(), context.filePath()), builder.build(), context.fileIndex());
        MessageType fileSchema = reader.getFileMetaData().getSchema();
        MessageType requestedSchema = this.clipParquetSchema(fileSchema);
        reader.setRequestedSchema(requestedSchema);
        WritableColumnVector[] writableVectors = this.createWritableVectors(requestedSchema);
        MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(requestedSchema);
        List<ParquetField> fields = ParquetSplitReaderUtil.buildFieldsList(this.projectedType.getFields(), this.projectedType.getFieldNames(), columnIO);
        return new VectorizedParquetRecordReader(context.filePath(), reader, fileSchema, fields, writableVectors, this.batchSize);
    }

    private void setReadOptions(ParquetReadOptions.Builder builder) {
        builder.useSignedStringMinMax(this.conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
        builder.useDictionaryFilter(this.conf.getBoolean("parquet.filter.dictionary.enabled", true));
        builder.useStatsFilter(this.conf.getBoolean("parquet.filter.stats.enabled", true));
        builder.useRecordFilter(this.conf.getBoolean("parquet.filter.record-level.enabled", true));
        builder.useColumnIndexFilter(this.conf.getBoolean("parquet.filter.columnindex.enabled", true));
        builder.usePageChecksumVerification(this.conf.getBoolean("parquet.page.verify-checksum.enabled", false));
        builder.useBloomFilter(this.conf.getBoolean("parquet.filter.bloom.enabled", true));
        builder.withMaxAllocationInBytes(this.conf.getInteger(ALLOCATION_SIZE, 0x800000));
        String badRecordThresh = this.conf.getString("parquet.read.bad.record.threshold", null);
        if (badRecordThresh != null) {
            builder.set("parquet.read.bad.record.threshold", badRecordThresh);
        }
        builder.withRecordFilter(this.filter);
    }

    private MessageType clipParquetSchema(GroupType parquetSchema) {
        Type[] types = new Type[this.projectedColumnNames.length];
        for (int i = 0; i < this.projectedColumnNames.length; ++i) {
            String fieldName = this.projectedColumnNames[i];
            if (!parquetSchema.containsField(fieldName)) {
                LOG.warn("{} does not exist in {}, will fill the field with null.", (Object)fieldName, (Object)parquetSchema);
                types[i] = ParquetSchemaConverter.convertToParquetType(fieldName, this.projectedFields[i]);
                this.unknownFieldsIndices.add(i);
                continue;
            }
            Type parquetType = parquetSchema.getType(fieldName);
            types[i] = this.clipParquetType(this.projectedFields[i].type(), parquetType);
        }
        return (MessageType)((Types.GroupBuilder)Types.buildMessage().addFields(types)).named("paimon-parquet");
    }

    private Type clipParquetType(DataType readType, Type parquetType) {
        switch (readType.getTypeRoot()) {
            case ROW: {
                RowType rowType = (RowType)readType;
                GroupType rowGroup = (GroupType)parquetType;
                ArrayList<Type> rowGroupFields = new ArrayList<Type>();
                for (DataField field : rowType.getFields()) {
                    String fieldName = field.name();
                    if (rowGroup.containsField(fieldName)) {
                        Type type = rowGroup.getType(fieldName);
                        rowGroupFields.add(this.clipParquetType(field.type(), type));
                        continue;
                    }
                    throw new RuntimeException("field " + fieldName + " is missing");
                }
                return rowGroup.withNewFields(rowGroupFields);
            }
            case MAP: {
                MapType mapType = (MapType)readType;
                GroupType mapGroup = (GroupType)parquetType;
                GroupType keyValue = mapGroup.getType("key_value").asGroupType();
                return ConversionPatterns.mapType(mapGroup.getRepetition(), mapGroup.getName(), "key_value", this.clipParquetType(mapType.getKeyType(), keyValue.getType("key")), keyValue.containsField("value") ? this.clipParquetType(mapType.getValueType(), keyValue.getType("value")) : null);
            }
            case ARRAY: {
                ArrayType arrayType = (ArrayType)readType;
                GroupType arrayGroup = (GroupType)parquetType;
                GroupType list = arrayGroup.getType("list").asGroupType();
                return ConversionPatterns.listOfElements(arrayGroup.getRepetition(), arrayGroup.getName(), this.clipParquetType(arrayType.getElementType(), list.getType("element")));
            }
        }
        return parquetType;
    }

    private void checkSchema(MessageType fileSchema, MessageType requestedSchema) throws IOException, UnsupportedOperationException {
        if (this.projectedColumnNames.length != requestedSchema.getFieldCount()) {
            throw new RuntimeException("The quality of field type is incompatible with the request schema!");
        }
        for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
            Object[] colPath = requestedSchema.getPaths().get(i);
            if (fileSchema.containsPath((String[])colPath)) {
                ColumnDescriptor fd = fileSchema.getColumnDescription((String[])colPath);
                if (fd.equals(requestedSchema.getColumns().get(i))) continue;
                throw new UnsupportedOperationException("Schema evolution not supported.");
            }
            if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() != 0) continue;
            throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath));
        }
    }

    private Pool<ParquetReaderBatch> createPoolOfBatches(Path filePath, MessageType requestedSchema) {
        Pool<ParquetReaderBatch> pool = new Pool<ParquetReaderBatch>(1);
        pool.add(this.createReaderBatch(filePath, requestedSchema, pool.recycler()));
        return pool;
    }

    private ParquetReaderBatch createReaderBatch(Path filePath, MessageType requestedSchema, Pool.Recycler<ParquetReaderBatch> recycler) {
        WritableColumnVector[] writableVectors = this.createWritableVectors(requestedSchema);
        VectorizedColumnBatch columnarBatch = this.createVectorizedColumnBatch(writableVectors);
        return this.createReaderBatch(filePath, writableVectors, columnarBatch, recycler);
    }

    private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) {
        WritableColumnVector[] columns = new WritableColumnVector[this.projectedFields.length];
        List<Type> types = requestedSchema.getFields();
        for (int i = 0; i < this.projectedFields.length; ++i) {
            columns[i] = ParquetSplitReaderUtil.createWritableColumnVector(this.batchSize, this.projectedFields[i].type(), types.get(i), requestedSchema.getColumns(), 0);
        }
        return columns;
    }

    private VectorizedColumnBatch createVectorizedColumnBatch(WritableColumnVector[] writableVectors) {
        ColumnVector[] vectors = new ColumnVector[writableVectors.length];
        block4: for (int i = 0; i < writableVectors.length; ++i) {
            switch (this.projectedFields[i].type().getTypeRoot()) {
                case DECIMAL: {
                    vectors[i] = new ParquetDecimalVector(writableVectors[i]);
                    continue block4;
                }
                case TIMESTAMP_WITHOUT_TIME_ZONE: 
                case TIMESTAMP_WITH_LOCAL_TIME_ZONE: {
                    vectors[i] = new ParquetTimestampVector(writableVectors[i]);
                    continue block4;
                }
                default: {
                    vectors[i] = writableVectors[i];
                }
            }
        }
        return new VectorizedColumnBatch(vectors);
    }

    private ParquetReaderBatch createReaderBatch(Path filePath, WritableColumnVector[] writableVectors, VectorizedColumnBatch columnarBatch, Pool.Recycler<ParquetReaderBatch> recycler) {
        return new ParquetReaderBatch(filePath, writableVectors, columnarBatch, recycler);
    }

    private static class ParquetReaderBatch {
        private final WritableColumnVector[] writableVectors;
        private final boolean containsNestedColumn;
        private final VectorizedColumnBatch columnarBatch;
        private final Pool.Recycler<ParquetReaderBatch> recycler;
        private final ColumnarRowIterator result;

        protected ParquetReaderBatch(Path filePath, WritableColumnVector[] writableVectors, VectorizedColumnBatch columnarBatch, Pool.Recycler<ParquetReaderBatch> recycler) {
            this.writableVectors = writableVectors;
            this.containsNestedColumn = Arrays.stream(writableVectors).anyMatch(vector -> vector instanceof MapColumnVector || vector instanceof RowColumnVector || vector instanceof ArrayColumnVector);
            this.columnarBatch = columnarBatch;
            this.recycler = recycler;
            this.result = this.containsNestedColumn ? new ColumnarRowIterator(filePath, new ColumnarRow(columnarBatch), this::recycle) : new VectorizedRowIterator(filePath, new ColumnarRow(columnarBatch), this::recycle);
        }

        public void recycle() {
            this.recycler.recycle(this);
        }

        public ColumnarRowIterator convertAndGetIterator(long rowNumber) {
            this.result.reset(rowNumber);
            return this.result;
        }
    }

    private class ParquetReader
    implements FileRecordReader<InternalRow> {
        private ParquetFileReader reader;
        private final MessageType requestedSchema;
        private final long totalRowCount;
        private final Pool<ParquetReaderBatch> pool;
        private long rowsReturned;
        private long totalCountLoadedSoFar;
        private long currentRowPosition;
        private long nextRowPosition;
        private ParquetReadState currentRowGroupReadState;
        private long currentRowGroupFirstRowIndex;
        private ColumnReader[] columnReaders;
        private final List<ParquetField> fields;

        private ParquetReader(ParquetFileReader reader, MessageType requestedSchema, long totalRowCount, Pool<ParquetReaderBatch> pool, List<ParquetField> fields) {
            this.reader = reader;
            this.requestedSchema = requestedSchema;
            this.totalRowCount = totalRowCount;
            this.pool = pool;
            this.rowsReturned = 0L;
            this.totalCountLoadedSoFar = 0L;
            this.currentRowPosition = 0L;
            this.nextRowPosition = 0L;
            this.currentRowGroupFirstRowIndex = 0L;
            this.fields = fields;
        }

        @Override
        @Nullable
        public ColumnarRowIterator readBatch() throws IOException {
            ParquetReaderBatch batch = this.getCachedEntry();
            if (!this.nextBatch(batch)) {
                batch.recycle();
                return null;
            }
            return batch.convertAndGetIterator(this.currentRowPosition);
        }

        private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
            if (this.rowsReturned >= this.totalRowCount) {
                return false;
            }
            for (WritableColumnVector v : batch.writableVectors) {
                v.reset();
            }
            batch.columnarBatch.setNumRows(0);
            if (this.rowsReturned == this.totalCountLoadedSoFar) {
                this.readNextRowGroup();
            } else {
                this.currentRowPosition = this.nextRowPosition;
            }
            int num = this.getBachSize();
            for (int i = 0; i < this.columnReaders.length; ++i) {
                if (this.columnReaders[i] == null) {
                    batch.writableVectors[i].fillWithNulls();
                    continue;
                }
                this.columnReaders[i].readToVector(num, batch.writableVectors[i]);
            }
            this.rowsReturned += (long)num;
            this.nextRowPosition = this.getNextRowPosition(num);
            batch.columnarBatch.setNumRows(num);
            return true;
        }

        private void readNextRowGroup() throws IOException {
            PageReadStore rowGroup = this.reader.readNextFilteredRowGroup();
            if (rowGroup == null) {
                throw new IOException("expecting more rows but reached last block. Read " + this.rowsReturned + " out of " + this.totalRowCount);
            }
            this.currentRowGroupReadState = new ParquetReadState(rowGroup.getRowIndexes().orElse(null));
            List<Type> types = this.requestedSchema.getFields();
            this.columnReaders = new ColumnReader[types.size()];
            for (int i = 0; i < types.size(); ++i) {
                if (ParquetReaderFactory.this.unknownFieldsIndices.contains(i)) continue;
                this.columnReaders[i] = ParquetSplitReaderUtil.createColumnReader(ParquetReaderFactory.this.projectedFields[i].type(), types.get(i), this.requestedSchema.getColumns(), rowGroup, this.fields.get(i), 0);
            }
            this.totalCountLoadedSoFar += rowGroup.getRowCount();
            if (rowGroup.getRowIndexOffset().isPresent()) {
                this.currentRowGroupFirstRowIndex = rowGroup.getRowIndexOffset().get();
                long pageIndex = 0L;
                if (!this.currentRowGroupReadState.isMaxRange()) {
                    pageIndex = this.currentRowGroupReadState.currentRangeStart();
                }
                this.currentRowPosition = this.currentRowGroupFirstRowIndex + pageIndex;
            } else {
                if (this.reader.rowGroupsFiltered()) {
                    throw new RuntimeException("There is a bug, rowIndexOffset must be present when row groups are filtered.");
                }
                this.currentRowGroupFirstRowIndex = this.nextRowPosition;
                this.currentRowPosition = this.nextRowPosition;
            }
        }

        private int getBachSize() throws IOException {
            long rangeBatchSize = Long.MAX_VALUE;
            if (this.currentRowGroupReadState.isFinished()) {
                throw new IOException("expecting more rows but reached last page block. Read " + this.rowsReturned + " out of " + this.totalRowCount);
            }
            if (!this.currentRowGroupReadState.isMaxRange()) {
                long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex;
                rangeBatchSize = this.currentRowGroupReadState.currentRangeEnd() - pageIndex + 1L;
            }
            return (int)Math.min((long)ParquetReaderFactory.this.batchSize, Math.min(rangeBatchSize, this.totalCountLoadedSoFar - this.rowsReturned));
        }

        private long getNextRowPosition(int num) {
            if (this.currentRowGroupReadState.isMaxRange()) {
                return this.currentRowPosition + (long)num;
            }
            long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex;
            long nextIndex = pageIndex + (long)num;
            if (this.currentRowGroupReadState.currentRangeEnd() < nextIndex) {
                this.currentRowGroupReadState.nextRange();
                nextIndex = this.currentRowGroupReadState.currentRangeStart();
            }
            return this.currentRowGroupFirstRowIndex + nextIndex;
        }

        private ParquetReaderBatch getCachedEntry() throws IOException {
            try {
                return this.pool.pollEntry();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted");
            }
        }

        @Override
        public void close() throws IOException {
            if (this.reader != null) {
                this.reader.close();
                this.reader = null;
            }
        }
    }
}

