/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.shaded.com.google.common.base.Preconditions;
import org.apache.iceberg.shaded.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.shaded.com.google.common.collect.Iterables;
import org.apache.iceberg.shaded.com.google.common.collect.Iterators;
import org.apache.iceberg.shaded.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroReader;
import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.spark.source.Stats;
import org.apache.iceberg.spark.source.StructInternalRow;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.Statistics;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;

class Reader
implements DataSourceReader,
SupportsPushDownFilters,
SupportsPushDownRequiredColumns,
SupportsReportStatistics {
    private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
    private static final Filter[] NO_FILTERS = new Filter[0];
    private final Table table;
    private final Long snapshotId;
    private final Long asOfTimestamp;
    private final Long splitSize;
    private final Integer splitLookback;
    private final Long splitOpenFileCost;
    private final FileIO fileIo;
    private final EncryptionManager encryptionManager;
    private final boolean caseSensitive;
    private StructType requestedSchema = null;
    private List<Expression> filterExpressions = null;
    private Filter[] pushedFilters = NO_FILTERS;
    private Schema schema = null;
    private StructType type = null;
    private List<CombinedScanTask> tasks = null;

    Reader(Table table, boolean caseSensitive, DataSourceOptions options) {
        this.table = table;
        this.snapshotId = options.get("snapshot-id").map(Long::parseLong).orElse(null);
        this.asOfTimestamp = options.get("as-of-timestamp").map(Long::parseLong).orElse(null);
        if (this.snapshotId != null && this.asOfTimestamp != null) {
            throw new IllegalArgumentException("Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
        }
        this.splitSize = options.get("split-size").map(Long::parseLong).orElse(null);
        this.splitLookback = options.get("lookback").map(Integer::parseInt).orElse(null);
        this.splitOpenFileCost = options.get("file-open-cost").map(Long::parseLong).orElse(null);
        this.schema = table.schema();
        this.fileIo = table.io();
        this.encryptionManager = table.encryption();
        this.caseSensitive = caseSensitive;
    }

    private Schema lazySchema() {
        if (this.schema == null) {
            this.schema = this.requestedSchema != null ? SparkSchemaUtil.prune(this.table.schema(), this.requestedSchema) : this.table.schema();
        }
        return this.schema;
    }

    private StructType lazyType() {
        if (this.type == null) {
            this.type = SparkSchemaUtil.convert(this.lazySchema());
        }
        return this.type;
    }

    public StructType readSchema() {
        return this.lazyType();
    }

    public List<InputPartition<InternalRow>> planInputPartitions() {
        String tableSchemaString = SchemaParser.toJson(this.table.schema());
        String expectedSchemaString = SchemaParser.toJson(this.lazySchema());
        ArrayList<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
        for (CombinedScanTask task : this.tasks()) {
            readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, this.fileIo, this.encryptionManager, this.caseSensitive));
        }
        return readTasks;
    }

    public Filter[] pushFilters(Filter[] filters) {
        this.tasks = null;
        ArrayList<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length);
        ArrayList<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
        for (Filter filter : filters) {
            Expression expr = SparkFilters.convert(filter);
            if (expr == null) continue;
            expressions.add(expr);
            pushed.add(filter);
        }
        this.filterExpressions = expressions;
        this.pushedFilters = pushed.toArray(new Filter[0]);
        this.schema = null;
        this.type = null;
        return filters;
    }

    public Filter[] pushedFilters() {
        return this.pushedFilters;
    }

    public void pruneColumns(StructType newRequestedSchema) {
        this.requestedSchema = newRequestedSchema;
        this.schema = null;
        this.type = null;
    }

    public Statistics estimateStatistics() {
        long sizeInBytes = 0L;
        long numRows = 0L;
        for (CombinedScanTask task : this.tasks()) {
            for (FileScanTask file : task.files()) {
                sizeInBytes += file.length();
                numRows += file.file().recordCount();
            }
        }
        return new Stats(sizeInBytes, numRows);
    }

    private List<CombinedScanTask> tasks() {
        if (this.tasks == null) {
            TableScan scan = this.table.newScan().caseSensitive(this.caseSensitive).project(this.lazySchema());
            if (this.snapshotId != null) {
                scan = scan.useSnapshot(this.snapshotId);
            }
            if (this.asOfTimestamp != null) {
                scan = scan.asOfTime(this.asOfTimestamp);
            }
            if (this.splitSize != null) {
                scan = scan.option("read.split.target-size", this.splitSize.toString());
            }
            if (this.splitLookback != null) {
                scan = scan.option("read.split.planning-lookback", this.splitLookback.toString());
            }
            if (this.splitOpenFileCost != null) {
                scan = scan.option("read.split.open-file-cost", this.splitOpenFileCost.toString());
            }
            if (this.filterExpressions != null) {
                for (Expression filter : this.filterExpressions) {
                    scan = scan.filter(filter);
                }
            }
            try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks();){
                this.tasks = Lists.newArrayList(tasksIterable);
            }
            catch (IOException e) {
                throw new RuntimeIOException(e, "Failed to close table scan: %s", scan);
            }
        }
        return this.tasks;
    }

    public String toString() {
        return String.format("IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s)", this.table, this.lazySchema().asStruct(), this.filterExpressions, this.caseSensitive);
    }

    private static class StructLikeInternalRow
    implements StructLike {
        private final DataType[] types;
        private InternalRow row = null;

        StructLikeInternalRow(StructType struct) {
            this.types = new DataType[struct.size()];
            StructField[] fields = struct.fields();
            for (int i = 0; i < fields.length; ++i) {
                this.types[i] = fields[i].dataType();
            }
        }

        public StructLikeInternalRow setRow(InternalRow row) {
            this.row = row;
            return this;
        }

        @Override
        public int size() {
            return this.types.length;
        }

        @Override
        public <T> T get(int pos, Class<T> javaClass) {
            return javaClass.cast(this.row.get(pos, this.types[pos]));
        }

        @Override
        public <T> void set(int pos, T value) {
            throw new UnsupportedOperationException("Not implemented: set");
        }
    }

    private static class PartitionRowConverter
    implements Function<StructLike, InternalRow> {
        private final DataType[] types;
        private final int[] positions;
        private final Class<?>[] javaTypes;
        private final GenericInternalRow reusedRow;

        PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
            StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
            StructField[] fields = partitionType.fields();
            this.types = new DataType[fields.length];
            this.positions = new int[this.types.length];
            this.javaTypes = new Class[this.types.length];
            this.reusedRow = new GenericInternalRow(this.types.length);
            List<PartitionField> partitionFields = spec.fields();
            block0: for (int rowIndex = 0; rowIndex < fields.length; ++rowIndex) {
                this.types[rowIndex] = fields[rowIndex].dataType();
                int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
                for (int specIndex = 0; specIndex < partitionFields.size(); ++specIndex) {
                    PartitionField field = spec.fields().get(specIndex);
                    if (field.sourceId() != sourceId || !"identity".equals(field.transform().toString())) continue;
                    this.positions[rowIndex] = specIndex;
                    this.javaTypes[rowIndex] = spec.javaClasses()[specIndex];
                    continue block0;
                }
            }
        }

        @Override
        public InternalRow apply(StructLike tuple) {
            for (int i = 0; i < this.types.length; ++i) {
                Object value = tuple.get(this.positions[i], this.javaTypes[i]);
                if (value != null) {
                    this.reusedRow.update(i, PartitionRowConverter.convert(value, this.types[i]));
                    continue;
                }
                this.reusedRow.setNullAt(i);
            }
            return this.reusedRow;
        }

        private static Object convert(Object value, DataType type) {
            if (type instanceof StringType) {
                return UTF8String.fromString((String)value.toString());
            }
            if (type instanceof BinaryType) {
                return ByteBuffers.toByteArray((ByteBuffer)value);
            }
            if (type instanceof DecimalType) {
                return Decimal.fromDecimal((Object)value);
            }
            return value;
        }
    }

    private static class TaskDataReader
    implements InputPartitionReader<InternalRow> {
        private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply").impl(UnsafeProjection.class, InternalRow.class).build();
        private final Iterator<FileScanTask> tasks;
        private final Schema tableSchema;
        private final Schema expectedSchema;
        private final FileIO fileIo;
        private final Map<String, InputFile> inputFiles;
        private final boolean caseSensitive;
        private Iterator<InternalRow> currentIterator = null;
        private Closeable currentCloseable = null;
        private InternalRow current = null;

        TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo, EncryptionManager encryptionManager, boolean caseSensitive) {
            this.fileIo = fileIo;
            this.tasks = task.files().iterator();
            this.tableSchema = tableSchema;
            this.expectedSchema = expectedSchema;
            Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(Iterables.transform(task.files(), fileScanTask -> EncryptedFiles.encryptedInput(this.fileIo.newInputFile(fileScanTask.file().path().toString()), fileScanTask.file().keyMetadata())));
            ImmutableMap.Builder inputFileBuilder = ImmutableMap.builder();
            decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
            this.inputFiles = inputFileBuilder.build();
            this.currentIterator = this.open(this.tasks.next());
            this.caseSensitive = caseSensitive;
        }

        public boolean next() throws IOException {
            while (true) {
                if (this.currentIterator.hasNext()) {
                    this.current = this.currentIterator.next();
                    return true;
                }
                if (!this.tasks.hasNext()) break;
                this.currentCloseable.close();
                this.currentIterator = this.open(this.tasks.next());
            }
            return false;
        }

        public InternalRow get() {
            return this.current;
        }

        public void close() throws IOException {
            this.currentCloseable.close();
            while (this.tasks.hasNext()) {
                this.tasks.next();
            }
        }

        private Iterator<InternalRow> open(FileScanTask task) {
            Iterator<Object> iter;
            Schema iterSchema;
            boolean hasExtraFilterColumns;
            DataFile file = task.file();
            Schema finalSchema = this.expectedSchema;
            PartitionSpec spec = task.spec();
            Set<Integer> idColumns = spec.identitySourceIds();
            StructType sparkType = SparkSchemaUtil.convert(finalSchema);
            Schema requiredSchema = SparkSchemaUtil.prune(this.tableSchema, sparkType, task.residual(), this.caseSensitive);
            boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
            boolean bl = hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();
            if (hasJoinedPartitionColumns) {
                Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
                Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
                PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
                JoinedRow joined = new JoinedRow();
                InternalRow partition = convertToRow.apply(file.partition());
                joined.withRight(partition);
                iterSchema = TypeUtil.join(readSchema, partitionSchema);
                iter = Iterators.transform(this.open(task, readSchema), arg_0 -> ((JoinedRow)joined).withLeft(arg_0));
            } else if (hasExtraFilterColumns) {
                iterSchema = requiredSchema;
                iter = this.open(task, requiredSchema);
            } else {
                iterSchema = finalSchema;
                iter = this.open(task, finalSchema);
            }
            return Iterators.transform(iter, arg_0 -> TaskDataReader.lambda$open$2(APPLY_PROJECTION.bind(TaskDataReader.projection(finalSchema, iterSchema)), arg_0));
        }

        private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
            CloseableIterable<InternalRow> iter;
            if (task.isDataTask()) {
                iter = this.newDataIterable(task.asDataTask(), readSchema);
            } else {
                InputFile location = this.inputFiles.get(task.file().path().toString());
                Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
                switch (task.file().format()) {
                    case PARQUET: {
                        iter = this.newParquetIterable(location, task, readSchema);
                        break;
                    }
                    case AVRO: {
                        iter = this.newAvroIterable(location, task, readSchema);
                        break;
                    }
                    case ORC: {
                        iter = this.newOrcIterable(location, task, readSchema);
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException("Cannot read unknown format: " + (Object)((Object)task.file().format()));
                    }
                }
            }
            this.currentCloseable = iter;
            return iter.iterator();
        }

        private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) {
            StructType struct = SparkSchemaUtil.convert(readSchema);
            List refs = (List)JavaConverters.seqAsJavaListConverter((Seq)struct.toAttributes()).asJava();
            ArrayList<Attribute> attrs = Lists.newArrayListWithExpectedSize(struct.fields().length);
            ArrayList<org.apache.spark.sql.catalyst.expressions.Expression> exprs = Lists.newArrayListWithExpectedSize(struct.fields().length);
            for (AttributeReference ref : refs) {
                attrs.add(ref.toAttribute());
            }
            for (Types.NestedField field : finalSchema.columns()) {
                int indexInReadSchema = struct.fieldIndex(field.name());
                exprs.add((org.apache.spark.sql.catalyst.expressions.Expression)refs.get(indexInReadSchema));
            }
            return UnsafeProjection.create((Seq)((Buffer)JavaConverters.asScalaBufferConverter(exprs).asScala()).toSeq(), (Seq)((Buffer)JavaConverters.asScalaBufferConverter(attrs).asScala()).toSeq());
        }

        private CloseableIterable<InternalRow> newAvroIterable(InputFile location, FileScanTask task, Schema readSchema) {
            return Avro.read(location).reuseContainers().project(readSchema).split(task.start(), task.length()).createReaderFunc(SparkAvroReader::new).build();
        }

        private CloseableIterable<InternalRow> newParquetIterable(InputFile location, FileScanTask task, Schema readSchema) {
            return Parquet.read(location).project(readSchema).split(task.start(), task.length()).createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema)).filter(task.residual()).caseSensitive(this.caseSensitive).build();
        }

        private CloseableIterable<InternalRow> newOrcIterable(InputFile location, FileScanTask task, Schema readSchema) {
            return ORC.read(location).schema(readSchema).split(task.start(), task.length()).createReaderFunc(SparkOrcReader::new).caseSensitive(this.caseSensitive).build();
        }

        private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
            StructInternalRow row = new StructInternalRow(this.tableSchema.asStruct());
            CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(task.asDataTask().rows(), row::setStruct);
            return CloseableIterable.transform(asSparkRows, arg_0 -> TaskDataReader.lambda$newDataIterable$4(APPLY_PROJECTION.bind(TaskDataReader.projection(readSchema, this.tableSchema)), arg_0));
        }

        private static /* synthetic */ InternalRow lambda$newDataIterable$4(DynMethods.BoundMethod rec$, Object xva$0) {
            return (InternalRow)rec$.invoke(xva$0);
        }

        private static /* synthetic */ InternalRow lambda$open$2(DynMethods.BoundMethod rec$, Object xva$0) {
            return (InternalRow)rec$.invoke(xva$0);
        }
    }

    private static class ReadTask
    implements InputPartition<InternalRow>,
    Serializable {
        private final CombinedScanTask task;
        private final String tableSchemaString;
        private final String expectedSchemaString;
        private final FileIO fileIo;
        private final EncryptionManager encryptionManager;
        private final boolean caseSensitive;
        private transient Schema tableSchema = null;
        private transient Schema expectedSchema = null;

        private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo, EncryptionManager encryptionManager, boolean caseSensitive) {
            this.task = task;
            this.tableSchemaString = tableSchemaString;
            this.expectedSchemaString = expectedSchemaString;
            this.fileIo = fileIo;
            this.encryptionManager = encryptionManager;
            this.caseSensitive = caseSensitive;
        }

        public InputPartitionReader<InternalRow> createPartitionReader() {
            return new TaskDataReader(this.task, this.lazyTableSchema(), this.lazyExpectedSchema(), this.fileIo, this.encryptionManager, this.caseSensitive);
        }

        private Schema lazyTableSchema() {
            if (this.tableSchema == null) {
                this.tableSchema = SchemaParser.fromJson(this.tableSchemaString);
            }
            return this.tableSchema;
        }

        private Schema lazyExpectedSchema() {
            if (this.expectedSchema == null) {
                this.expectedSchema = SchemaParser.fromJson(this.expectedSchemaString);
            }
            return this.expectedSchema;
        }
    }
}

