/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.hudi;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.slice.Slices;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.ParquetTypeUtils;
import io.trino.parquet.predicate.PredicateUtils;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.ParquetReader;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HivePageSourceProvider;
import io.trino.plugin.hive.HivePartitionKey;
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
import io.trino.plugin.hive.util.HiveUtil;
import io.trino.plugin.hudi.HudiErrorCode;
import io.trino.plugin.hudi.HudiPageSource;
import io.trino.plugin.hudi.HudiSessionProperties;
import io.trino.plugin.hudi.HudiSplit;
import io.trino.plugin.hudi.HudiUtil;
import io.trino.plugin.hudi.model.HudiFileFormat;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.Utils;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignature;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TimeZone;
import java.util.stream.Collectors;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.MessageType;
import org.joda.time.DateTimeZone;

public class HudiPageSourceProvider
implements ConnectorPageSourceProvider {
    private final TrinoFileSystemFactory fileSystemFactory;
    private final FileFormatDataSourceStats dataSourceStats;
    private final ParquetReaderOptions options;
    private final DateTimeZone timeZone;
    private static final int DOMAIN_COMPACTION_THRESHOLD = 1000;

    @Inject
    public HudiPageSourceProvider(TrinoFileSystemFactory fileSystemFactory, FileFormatDataSourceStats dataSourceStats, ParquetReaderConfig parquetReaderConfig) {
        this.fileSystemFactory = Objects.requireNonNull(fileSystemFactory, "fileSystemFactory is null");
        this.dataSourceStats = Objects.requireNonNull(dataSourceStats, "dataSourceStats is null");
        this.options = Objects.requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
        this.timeZone = DateTimeZone.forID((String)TimeZone.getDefault().getID());
    }

    public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit connectorSplit, ConnectorTableHandle connectorTable, List<ColumnHandle> columns, DynamicFilter dynamicFilter) {
        HudiSplit split = (HudiSplit)connectorSplit;
        String path = split.getLocation();
        HudiFileFormat hudiFileFormat = HudiUtil.getHudiFileFormat(path);
        if (!HudiFileFormat.PARQUET.equals((Object)hudiFileFormat)) {
            throw new TrinoException((ErrorCodeSupplier)HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT, String.format("File format %s not supported", new Object[]{hudiFileFormat}));
        }
        List<HiveColumnHandle> hiveColumns = columns.stream().map(HiveColumnHandle.class::cast).collect(Collectors.toList());
        List<HiveColumnHandle> regularColumns = hiveColumns.stream().filter(columnHandle -> !columnHandle.isPartitionKey() && !columnHandle.isHidden()).collect(Collectors.toList());
        TrinoFileSystem fileSystem = this.fileSystemFactory.create(session);
        TrinoInputFile inputFile = fileSystem.newInputFile(Location.of((String)path), split.getFileSize());
        ConnectorPageSource dataPageSource = HudiPageSourceProvider.createPageSource(session, regularColumns, split, inputFile, this.dataSourceStats, this.options, this.timeZone);
        return new HudiPageSource(HudiPageSourceProvider.toPartitionName(split.getPartitionKeys()), hiveColumns, this.convertPartitionValues(hiveColumns, split.getPartitionKeys()), dataPageSource, path, split.getFileSize(), split.getFileModifiedTime());
    }

    private static ConnectorPageSource createPageSource(ConnectorSession session, List<HiveColumnHandle> columns, HudiSplit hudiSplit, TrinoInputFile inputFile, FileFormatDataSourceStats dataSourceStats, ParquetReaderOptions options, DateTimeZone timeZone) {
        TrinoParquetDataSource dataSource = null;
        boolean useColumnNames = HudiSessionProperties.shouldUseParquetColumnNames(session);
        String path = hudiSplit.getLocation();
        long start = hudiSplit.getStart();
        long length = hudiSplit.getLength();
        try {
            dataSource = new TrinoParquetDataSource(inputFile, options, dataSourceStats);
            ParquetMetadata parquetMetadata = MetadataReader.readFooter((ParquetDataSource)dataSource, Optional.empty());
            FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
            MessageType fileSchema = fileMetaData.getSchema();
            Optional message = ParquetPageSourceFactory.getParquetMessageType(columns, (boolean)useColumnNames, (MessageType)fileSchema);
            MessageType requestedSchema = message.orElse(new MessageType(fileSchema.getName(), (List)ImmutableList.of()));
            MessageColumnIO messageColumn = ParquetTypeUtils.getColumnIO((MessageType)fileSchema, (MessageType)requestedSchema);
            Map descriptorsByPath = ParquetTypeUtils.getDescriptors((MessageType)fileSchema, (MessageType)requestedSchema);
            TupleDomain parquetTupleDomain = options.isIgnoreStatistics() ? TupleDomain.all() : ParquetPageSourceFactory.getParquetTupleDomain((Map)descriptorsByPath, hudiSplit.getPredicate(), (MessageType)fileSchema, (boolean)useColumnNames);
            TupleDomainParquetPredicate parquetPredicate = PredicateUtils.buildPredicate((MessageType)requestedSchema, (TupleDomain)parquetTupleDomain, (Map)descriptorsByPath, (DateTimeZone)timeZone);
            long nextStart = 0L;
            ImmutableList.Builder blocks = ImmutableList.builder();
            ImmutableList.Builder blockStarts = ImmutableList.builder();
            ImmutableList.Builder columnIndexes = ImmutableList.builder();
            for (BlockMetaData block : parquetMetadata.getBlocks()) {
                long firstDataPage = ((ColumnChunkMetaData)block.getColumns().get(0)).getFirstDataPageOffset();
                Optional columnIndex = ParquetPageSourceFactory.getColumnIndexStore((ParquetDataSource)dataSource, (BlockMetaData)block, (Map)descriptorsByPath, (TupleDomain)parquetTupleDomain, (ParquetReaderOptions)options);
                if (start <= firstDataPage && firstDataPage < start + length && PredicateUtils.predicateMatches((TupleDomainParquetPredicate)parquetPredicate, (BlockMetaData)block, (ParquetDataSource)dataSource, (Map)descriptorsByPath, (TupleDomain)parquetTupleDomain, (Optional)columnIndex, Optional.empty(), (DateTimeZone)timeZone, (int)1000)) {
                    blocks.add((Object)block);
                    blockStarts.add((Object)nextStart);
                    columnIndexes.add((Object)columnIndex);
                }
                nextStart += block.getRowCount();
            }
            Optional readerProjections = HivePageSourceProvider.projectBaseColumns(columns);
            List<HiveColumnHandle> baseColumns = readerProjections.map(projection -> projection.get().stream().map(HiveColumnHandle.class::cast).collect(Collectors.toUnmodifiableList())).orElse(columns);
            ParquetDataSourceId dataSourceId = dataSource.getId();
            TrinoParquetDataSource finalDataSource = dataSource;
            ParquetPageSourceFactory.ParquetReaderProvider parquetReaderProvider = arg_0 -> HudiPageSourceProvider.lambda$createPageSource$3(fileMetaData, blocks, blockStarts, (ParquetDataSource)finalDataSource, timeZone, options, dataSourceId, parquetPredicate, columnIndexes, arg_0);
            return ParquetPageSourceFactory.createParquetPageSource(baseColumns, (MessageType)fileSchema, (MessageColumnIO)messageColumn, (boolean)useColumnNames, (ParquetPageSourceFactory.ParquetReaderProvider)parquetReaderProvider);
        }
        catch (IOException | RuntimeException e) {
            try {
                if (dataSource != null) {
                    dataSource.close();
                }
            }
            catch (IOException fileMetaData) {
                // empty catch block
            }
            if (e instanceof TrinoException) {
                throw (TrinoException)e;
            }
            if (e instanceof ParquetCorruptionException) {
                throw new TrinoException((ErrorCodeSupplier)HudiErrorCode.HUDI_BAD_DATA, (Throwable)e);
            }
            String message = "Error opening Hudi split %s (offset=%s, length=%s): %s".formatted(path, start, length, e.getMessage());
            throw new TrinoException((ErrorCodeSupplier)HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT, message, (Throwable)e);
        }
    }

    private static TrinoException handleException(ParquetDataSourceId dataSourceId, Exception exception) {
        if (exception instanceof TrinoException) {
            return (TrinoException)exception;
        }
        if (exception instanceof ParquetCorruptionException) {
            return new TrinoException((ErrorCodeSupplier)HudiErrorCode.HUDI_BAD_DATA, (Throwable)exception);
        }
        return new TrinoException((ErrorCodeSupplier)HudiErrorCode.HUDI_CURSOR_ERROR, String.format("Failed to read Parquet file: %s", dataSourceId), (Throwable)exception);
    }

    private Map<String, Block> convertPartitionValues(List<HiveColumnHandle> allColumns, List<HivePartitionKey> partitionKeys) {
        return allColumns.stream().filter(HiveColumnHandle::isPartitionKey).collect(Collectors.toMap(HiveColumnHandle::getName, columnHandle -> Utils.nativeValueToBlock((Type)columnHandle.getType(), HudiPageSourceProvider.partitionToNativeValue(columnHandle.getName(), partitionKeys, columnHandle.getType().getTypeSignature()).orElse(null))));
    }

    private static Optional<Object> partitionToNativeValue(String partitionColumnName, List<HivePartitionKey> partitionKeys, TypeSignature partitionDataType) {
        HivePartitionKey partitionKey = partitionKeys.stream().filter(key -> key.getName().equalsIgnoreCase(partitionColumnName)).findFirst().orElse(null);
        if (Objects.isNull(partitionKey)) {
            return Optional.empty();
        }
        String partitionValue = partitionKey.getValue();
        String baseType = partitionDataType.getBase();
        try {
            switch (baseType) {
                case "tinyint": 
                case "smallint": 
                case "integer": 
                case "bigint": {
                    return Optional.of(Long.parseLong(partitionValue));
                }
                case "real": {
                    return Optional.of(Long.valueOf(Float.floatToRawIntBits(Float.parseFloat(partitionValue))));
                }
                case "double": {
                    return Optional.of(Double.parseDouble(partitionValue));
                }
                case "varchar": 
                case "varbinary": {
                    return Optional.of(Slices.utf8Slice((String)partitionValue));
                }
                case "date": {
                    return Optional.of(LocalDate.parse(partitionValue, DateTimeFormatter.ISO_LOCAL_DATE).toEpochDay());
                }
                case "timestamp": {
                    return Optional.of(Timestamp.valueOf(partitionValue).toLocalDateTime().toEpochSecond(ZoneOffset.UTC) * 1000L);
                }
                case "boolean": {
                    Preconditions.checkArgument((partitionValue.equalsIgnoreCase("true") || partitionValue.equalsIgnoreCase("false") ? 1 : 0) != 0);
                    return Optional.of(Boolean.valueOf(partitionValue));
                }
                case "decimal": {
                    return Optional.of(Decimals.parse((String)partitionValue).getObject());
                }
            }
            throw new TrinoException((ErrorCodeSupplier)HudiErrorCode.HUDI_INVALID_PARTITION_VALUE, String.format("Unsupported data type '%s' for partition column %s", partitionDataType, partitionColumnName));
        }
        catch (IllegalArgumentException | DateTimeParseException e) {
            throw new TrinoException((ErrorCodeSupplier)HudiErrorCode.HUDI_INVALID_PARTITION_VALUE, String.format("Can not parse partition value '%s' of type '%s' for partition column '%s'", partitionValue, partitionDataType, partitionColumnName), (Throwable)e);
        }
    }

    private static String toPartitionName(List<HivePartitionKey> partitions) {
        ImmutableList.Builder partitionNames = ImmutableList.builderWithExpectedSize((int)partitions.size());
        ImmutableList.Builder partitionValues = ImmutableList.builderWithExpectedSize((int)partitions.size());
        for (HivePartitionKey partition : partitions) {
            partitionNames.add((Object)partition.getName());
            partitionValues.add((Object)partition.getValue());
        }
        return HiveUtil.makePartName((List)partitionNames.build(), (List)partitionValues.build());
    }

    private static /* synthetic */ ParquetReader lambda$createPageSource$3(FileMetaData fileMetaData, ImmutableList.Builder blocks, ImmutableList.Builder blockStarts, ParquetDataSource finalDataSource, DateTimeZone timeZone, ParquetReaderOptions options, ParquetDataSourceId dataSourceId, TupleDomainParquetPredicate parquetPredicate, ImmutableList.Builder columnIndexes, List fields) throws IOException {
        return new ParquetReader(Optional.ofNullable(fileMetaData.getCreatedBy()), fields, (List)blocks.build(), (List)blockStarts.build(), finalDataSource, timeZone, AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), options, exception -> HudiPageSourceProvider.handleException(dataSourceId, exception), Optional.of(parquetPredicate), (List)columnIndexes.build(), Optional.empty());
    }
}

