/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
import io.airlift.slice.Slices;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.metadata.FileMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.reader.MetadataReader;
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.deltalake.DeltaHiveTypeTranslator;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.DeltaLakeColumnProjectionInfo;
import io.trino.plugin.deltalake.DeltaLakeColumnType;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeErrorCode;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeSplit;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.delete.DeletionVectors;
import io.trino.plugin.deltalake.delete.PageFilter;
import io.trino.plugin.deltalake.delete.PositionDeleteFilter;
import io.trino.plugin.deltalake.delete.RoaringBitmapArray;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.TransactionLogParser;
import io.trino.plugin.deltalake.util.DeltaLakeDomains;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveColumnProjectionInfo;
import io.trino.plugin.hive.TransformConnectorPageSource;
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.block.RowBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.Utils;
import io.trino.spi.type.DateTimeEncoding;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.VarcharType;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.joda.time.DateTimeZone;

public class DeltaLakePageSourceProvider
implements ConnectorPageSourceProvider {
    private static final JsonCodec<List<String>> PARTITIONS_CODEC = new JsonCodecFactory().listJsonCodec(String.class);
    private static final int MAX_ROW_ID_POSITIONS = 100000;
    private final TrinoFileSystemFactory fileSystemFactory;
    private final FileFormatDataSourceStats fileFormatDataSourceStats;
    private final ParquetReaderOptions parquetReaderOptions;
    private final int domainCompactionThreshold;
    private final DateTimeZone parquetDateTimeZone;
    private final TypeManager typeManager;

    @Inject
    public DeltaLakePageSourceProvider(TrinoFileSystemFactory fileSystemFactory, FileFormatDataSourceStats fileFormatDataSourceStats, ParquetReaderConfig parquetReaderConfig, DeltaLakeConfig deltaLakeConfig, TypeManager typeManager) {
        this.fileSystemFactory = Objects.requireNonNull(fileSystemFactory, "fileSystemFactory is null");
        this.fileFormatDataSourceStats = Objects.requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
        this.parquetReaderOptions = ParquetReaderOptions.builder((ParquetReaderOptions)parquetReaderConfig.toParquetReaderOptions()).withBloomFilter(false).build();
        this.domainCompactionThreshold = deltaLakeConfig.getDomainCompactionThreshold();
        this.parquetDateTimeZone = deltaLakeConfig.getParquetDateTimeZone();
        this.typeManager = Objects.requireNonNull(typeManager, "typeManager is null");
    }

    public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit connectorSplit, ConnectorTableHandle connectorTable, List<ColumnHandle> columns, DynamicFilter dynamicFilter) {
        TupleDomain filteredSplitPredicate;
        DeltaLakeSplit split = (DeltaLakeSplit)connectorSplit;
        DeltaLakeTableHandle table = (DeltaLakeTableHandle)connectorTable;
        List deltaLakeColumns = (List)columns.stream().map(DeltaLakeColumnHandle.class::cast).collect(ImmutableList.toImmutableList());
        List regularColumns = (List)deltaLakeColumns.stream().filter(column -> column.columnType() == DeltaLakeColumnType.REGULAR || column.baseColumnName().equals("$row_id")).collect(ImmutableList.toImmutableList());
        Map<String, Optional<String>> partitionKeys = split.getPartitionKeys();
        DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode = DeltaLakeSchemaSupport.getColumnMappingMode(table.getMetadataEntry(), table.getProtocolEntry());
        Optional<Object> partitionValues = Optional.empty();
        if (deltaLakeColumns.stream().anyMatch(column -> column.baseColumnName().equals("$row_id"))) {
            partitionValues = Optional.of(new ArrayList());
            Map columnsMetadataByName = (Map)DeltaLakeSchemaSupport.extractSchema(table.getMetadataEntry(), table.getProtocolEntry(), this.typeManager).stream().collect(ImmutableMap.toImmutableMap(DeltaLakeColumnMetadata::name, Function.identity()));
            for (String partitionColumnName : table.getMetadataEntry().getOriginalPartitionColumns()) {
                DeltaLakeColumnMetadata partitionColumn = (DeltaLakeColumnMetadata)columnsMetadataByName.get(partitionColumnName);
                Preconditions.checkState((partitionColumn != null ? 1 : 0) != 0, (String)"Partition column %s not found", (Object)partitionColumnName);
                Optional<String> value = switch (columnMappingMode) {
                    case DeltaLakeSchemaSupport.ColumnMappingMode.NONE -> partitionKeys.get(partitionColumn.name());
                    case DeltaLakeSchemaSupport.ColumnMappingMode.ID, DeltaLakeSchemaSupport.ColumnMappingMode.NAME -> partitionKeys.get(partitionColumn.physicalName());
                    default -> throw new IllegalStateException("Unknown column mapping mode");
                };
                ((List)partitionValues.get()).add(value.orElse(null));
            }
        }
        if ((filteredSplitPredicate = TupleDomain.intersect((List)ImmutableList.of(table.getNonPartitionConstraint(), split.getStatisticsPredicate(), (Object)dynamicFilter.getCurrentPredicate().transformKeys(DeltaLakeColumnHandle.class::cast)))).isNone()) {
            return new EmptyPageSource();
        }
        Map partitionColumnDomains = (Map)((Map)filteredSplitPredicate.getDomains().orElseThrow()).entrySet().stream().filter(entry -> ((DeltaLakeColumnHandle)entry.getKey()).columnType() == DeltaLakeColumnType.PARTITION_KEY).collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
        if (!DeltaLakeDomains.partitionMatchesPredicate(split.getPartitionKeys(), partitionColumnDomains)) {
            return new EmptyPageSource();
        }
        if (filteredSplitPredicate.isAll() && split.getStart() == 0L && split.getLength() == split.getFileSize() && split.getFileRowCount().isPresent() && split.getDeletionVector().isEmpty() && (regularColumns.isEmpty() || DeltaLakePageSourceProvider.onlyRowIdColumn(regularColumns))) {
            return DeltaLakePageSourceProvider.projectColumns(deltaLakeColumns, (Set<String>)ImmutableSet.of(), partitionKeys, partitionValues, DeltaLakePageSourceProvider.generatePages(split.getFileRowCount().get(), DeltaLakePageSourceProvider.onlyRowIdColumn(regularColumns)), split.getPath(), split.getFileSize(), split.getFileModifiedTime());
        }
        Location location = Location.of((String)split.getPath());
        TrinoFileSystem fileSystem = this.fileSystemFactory.create(session);
        TrinoInputFile inputFile = fileSystem.newInputFile(location, split.getFileSize());
        ParquetReaderOptions options = ParquetReaderOptions.builder((ParquetReaderOptions)this.parquetReaderOptions).withMaxReadBlockSize(DeltaLakeSessionProperties.getParquetMaxReadBlockSize(session)).withMaxReadBlockRowCount(DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount(session)).withSmallFileThreshold(DeltaLakeSessionProperties.getParquetSmallFileThreshold(session)).withUseColumnIndex(split.getDeletionVector().isEmpty() && DeltaLakeSessionProperties.isParquetUseColumnIndex(session)).withIgnoreStatistics(DeltaLakeSessionProperties.isParquetIgnoreStatistics(session)).withVectorizedDecodingEnabled(DeltaLakeSessionProperties.isParquetVectorizedDecodingEnabled(session)).build();
        ImmutableMap parquetFieldIdToName = columnMappingMode == DeltaLakeSchemaSupport.ColumnMappingMode.ID ? this.loadParquetIdAndNameMapping(inputFile, options) : ImmutableMap.of();
        ImmutableSet.Builder missingColumnNamesBuilder = ImmutableSet.builder();
        ImmutableList.Builder hiveColumnHandlesBuilder = ImmutableList.builder();
        for (DeltaLakeColumnHandle column2 : regularColumns) {
            if (column2.baseColumnName().equals("$row_id")) {
                hiveColumnHandlesBuilder.add((Object)ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN);
                continue;
            }
            DeltaLakePageSourceProvider.toHiveColumnHandle(column2, columnMappingMode, (Map<Integer, String>)parquetFieldIdToName).ifPresentOrElse(arg_0 -> ((ImmutableList.Builder)hiveColumnHandlesBuilder).add(arg_0), () -> missingColumnNamesBuilder.add((Object)column2.baseColumnName()));
        }
        if (split.getDeletionVector().isPresent() && !regularColumns.contains(DeltaLakeColumnHandle.rowPositionColumnHandle())) {
            hiveColumnHandlesBuilder.add((Object)ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN);
        }
        ImmutableList hiveColumnHandles = hiveColumnHandlesBuilder.build();
        ImmutableSet missingColumnNames = missingColumnNamesBuilder.build();
        TupleDomain<HiveColumnHandle> parquetPredicate = DeltaLakePageSourceProvider.getParquetTupleDomain((TupleDomain<DeltaLakeColumnHandle>)filteredSplitPredicate.simplify(this.domainCompactionThreshold), columnMappingMode, (Map<Integer, String>)parquetFieldIdToName);
        ConnectorPageSource delegate = ParquetPageSourceFactory.createPageSource((TrinoInputFile)inputFile, (long)split.getStart(), (long)split.getLength(), (List)hiveColumnHandles, (List)ImmutableList.of(parquetPredicate), (boolean)true, (DateTimeZone)this.parquetDateTimeZone, (FileFormatDataSourceStats)this.fileFormatDataSourceStats, (ParquetReaderOptions)options, Optional.empty(), (int)this.domainCompactionThreshold, (OptionalLong)OptionalLong.of(split.getFileSize()));
        if (split.getDeletionVector().isPresent()) {
            Supplier pageFilterSupplier = Suppliers.memoize(() -> {
                ImmutableList requiredColumns = ImmutableList.builderWithExpectedSize((int)(regularColumns.size() + 1)).addAll((Iterable)regularColumns).add((Object)DeltaLakeColumnHandle.rowPositionColumnHandle()).build();
                PositionDeleteFilter deleteFilter = DeltaLakePageSourceProvider.readDeletes(fileSystem, Location.of((String)table.location()), split.getDeletionVector().get());
                return deleteFilter.createPredicate((List<DeltaLakeColumnHandle>)requiredColumns);
            });
            int[] retainedColumns = IntStream.range(0, regularColumns.size()).toArray();
            delegate = TransformConnectorPageSource.create((ConnectorPageSource)delegate, page -> SourcePage.create((Page)((SourcePage)((PageFilter)pageFilterSupplier.get()).apply(page)).getColumns(retainedColumns)));
        }
        return DeltaLakePageSourceProvider.projectColumns(deltaLakeColumns, (Set<String>)missingColumnNames, partitionKeys, partitionValues, delegate, split.getPath(), split.getFileSize(), split.getFileModifiedTime());
    }

    public static ConnectorPageSource projectColumns(List<DeltaLakeColumnHandle> deltaLakeColumns, Set<String> missingColumnNames, Map<String, Optional<String>> partitionKeys, Optional<List<String>> partitionValues, ConnectorPageSource delegate, String path, long fileSize, long fileModifiedTime) {
        int delegateIndex = 0;
        TransformConnectorPageSource.Builder transform = TransformConnectorPageSource.builder();
        for (DeltaLakeColumnHandle column : deltaLakeColumns) {
            if (column.isBaseColumn() && partitionKeys.containsKey(column.basePhysicalColumnName())) {
                Object prefilledValue = TransactionLogParser.deserializePartitionValue(column, partitionKeys.get(column.basePhysicalColumnName()));
                transform.constantValue(Utils.nativeValueToBlock((io.trino.spi.type.Type)column.baseType(), (Object)prefilledValue));
                continue;
            }
            if (column.baseColumnName().equals("$path")) {
                transform.constantValue(Utils.nativeValueToBlock((io.trino.spi.type.Type)DeltaLakeColumnHandle.PATH_TYPE, (Object)Slices.utf8Slice((String)path)));
                continue;
            }
            if (column.baseColumnName().equals("$file_size")) {
                transform.constantValue(Utils.nativeValueToBlock((io.trino.spi.type.Type)DeltaLakeColumnHandle.FILE_SIZE_TYPE, (Object)fileSize));
                continue;
            }
            if (column.baseColumnName().equals("$file_modified_time")) {
                long packedTimestamp = DateTimeEncoding.packDateTimeWithZone((long)fileModifiedTime, (TimeZoneKey)TimeZoneKey.UTC_KEY);
                transform.constantValue(Utils.nativeValueToBlock((io.trino.spi.type.Type)DeltaLakeColumnHandle.FILE_MODIFIED_TIME_TYPE, (Object)packedTimestamp));
                continue;
            }
            if (column.baseColumnName().equals("$row_id")) {
                Block pathBlock = Utils.nativeValueToBlock((io.trino.spi.type.Type)VarcharType.VARCHAR, (Object)Slices.utf8Slice((String)path));
                Block partitionsBlock = Utils.nativeValueToBlock((io.trino.spi.type.Type)VarcharType.VARCHAR, (Object)Slices.wrappedBuffer((byte[])PARTITIONS_CODEC.toJsonBytes(partitionValues.orElseThrow(() -> new IllegalStateException("partitionValues not provided")))));
                transform.transform(delegateIndex, (Function)new CreateRowIdBlock(pathBlock, partitionsBlock));
                ++delegateIndex;
                continue;
            }
            if (missingColumnNames.contains(column.baseColumnName())) {
                transform.constantValue((Block)column.type().createNullBlock());
                continue;
            }
            transform.column(delegateIndex);
            ++delegateIndex;
        }
        return transform.build(delegate);
    }

    private static Block createRowIdBlock(Block pathValue, Block rowIndexBlock, Block partitionsValue) {
        return RowBlock.fromFieldBlocks((int)rowIndexBlock.getPositionCount(), (Block[])new Block[]{RunLengthEncodedBlock.create((Block)pathValue, (int)rowIndexBlock.getPositionCount()), rowIndexBlock, RunLengthEncodedBlock.create((Block)partitionsValue, (int)rowIndexBlock.getPositionCount())});
    }

    private static PositionDeleteFilter readDeletes(TrinoFileSystem fileSystem, Location tableLocation, DeletionVectorEntry deletionVector) {
        try {
            RoaringBitmapArray deletedRows = DeletionVectors.readDeletionVectors(fileSystem, tableLocation, deletionVector);
            return new PositionDeleteFilter(deletedRows);
        }
        catch (IOException e) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Failed to read deletion vectors", (Throwable)e);
        }
    }

    private Map<Integer, String> loadParquetIdAndNameMapping(TrinoInputFile inputFile, ParquetReaderOptions options) {
        Map map;
        TrinoParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, options, this.fileFormatDataSourceStats);
        try {
            ParquetMetadata parquetMetadata = MetadataReader.readFooter((ParquetDataSource)dataSource, Optional.empty());
            FileMetadata fileMetaData = parquetMetadata.getFileMetaData();
            MessageType fileSchema = fileMetaData.getSchema();
            map = (Map)fileSchema.getFields().stream().filter(field -> field.getId() != null).collect(ImmutableMap.toImmutableMap(field -> field.getId().intValue(), Type::getName));
        }
        catch (Throwable throwable) {
            try {
                try {
                    dataSource.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        dataSource.close();
        return map;
    }

    public static TupleDomain<HiveColumnHandle> getParquetTupleDomain(TupleDomain<DeltaLakeColumnHandle> effectivePredicate, DeltaLakeSchemaSupport.ColumnMappingMode columnMapping, Map<Integer, String> fieldIdToName) {
        if (effectivePredicate.isNone()) {
            return TupleDomain.none();
        }
        ImmutableMap.Builder predicate = ImmutableMap.builder();
        ((Map)effectivePredicate.getDomains().get()).forEach((columnHandle, domain) -> {
            String baseType = columnHandle.baseType().getTypeSignature().getBase();
            if (!(baseType.equals("map") || baseType.equals("array") || baseType.equals("row"))) {
                Optional<HiveColumnHandle> hiveColumnHandle = DeltaLakePageSourceProvider.toHiveColumnHandle(columnHandle, columnMapping, fieldIdToName);
                hiveColumnHandle.ifPresent(column -> predicate.put(column, domain));
            }
        });
        return TupleDomain.withColumnDomains((Map)predicate.buildOrThrow());
    }

    public static Optional<HiveColumnHandle> toHiveColumnHandle(DeltaLakeColumnHandle deltaLakeColumnHandle, DeltaLakeSchemaSupport.ColumnMappingMode columnMapping, Map<Integer, String> fieldIdToName) {
        switch (columnMapping) {
            case ID: {
                Integer fieldId = deltaLakeColumnHandle.baseFieldId().orElseThrow(() -> new IllegalArgumentException("Field ID must exist"));
                if (!fieldIdToName.containsKey(fieldId)) {
                    return Optional.empty();
                }
                String fieldName = fieldIdToName.get(fieldId);
                Optional<HiveColumnProjectionInfo> hiveColumnProjectionInfo = deltaLakeColumnHandle.projectionInfo().map(DeltaLakeColumnProjectionInfo::toHiveColumnProjectionInfo);
                return Optional.of(new HiveColumnHandle(fieldName, 0, DeltaHiveTypeTranslator.toHiveType(deltaLakeColumnHandle.basePhysicalType()), deltaLakeColumnHandle.basePhysicalType(), hiveColumnProjectionInfo, deltaLakeColumnHandle.columnType().toHiveColumnType(), Optional.empty()));
            }
            case NONE: 
            case NAME: {
                Preconditions.checkArgument((boolean)fieldIdToName.isEmpty(), (String)"Mapping between field id and name must be empty: %s", fieldIdToName);
                return Optional.of(deltaLakeColumnHandle.toHiveColumnHandle());
            }
        }
        throw new IllegalArgumentException("Unsupported column mapping: " + String.valueOf((Object)columnMapping));
    }

    private static boolean onlyRowIdColumn(List<DeltaLakeColumnHandle> columns) {
        return columns.size() == 1 && ((DeltaLakeColumnHandle)Iterables.getOnlyElement(columns)).baseColumnName().equals("$row_id");
    }

    private static ConnectorPageSource generatePages(final long totalRowCount, final boolean projectRowNumber) {
        return new FixedPageSource((Iterator)new AbstractIterator<Page>(){
            private long rowIndex;

            protected Page computeNext() {
                if (this.rowIndex == totalRowCount) {
                    return (Page)this.endOfData();
                }
                int pageSize = Math.toIntExact(Math.min(100000L, totalRowCount - this.rowIndex));
                Page page = projectRowNumber ? new Page(pageSize, new Block[]{DeltaLakePageSourceProvider.createRowNumberBlock(this.rowIndex, pageSize)}) : new Page(pageSize);
                this.rowIndex += (long)pageSize;
                return page;
            }
        }, 0L);
    }

    private static Block createRowNumberBlock(long baseIndex, int size) {
        long[] rowIndices = new long[size];
        for (int position = 0; position < size; ++position) {
            rowIndices[position] = baseIndex + (long)position;
        }
        return new LongArrayBlock(size, Optional.empty(), rowIndices);
    }

    private record CreateRowIdBlock(Block pathBlock, Block partitionsBlock) implements Function<Block, Block>
    {
        @Override
        public Block apply(Block block) {
            return DeltaLakePageSourceProvider.createRowIdBlock(this.pathBlock, block, this.partitionsBlock);
        }
    }
}

