/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake.transactionlog.checkpoint;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MoreCollectors;
import com.google.common.math.LongMath;
import io.airlift.log.Logger;
import io.trino.filesystem.TrinoInputFile;
import io.trino.parquet.Column;
import io.trino.parquet.Field;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.plugin.deltalake.DeltaHiveTypeTranslator;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.DeltaLakeColumnType;
import io.trino.plugin.deltalake.DeltaLakeErrorCode;
import io.trino.plugin.deltalake.DeltaLakeSplitManager;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TransactionEntry;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogParser;
import io.trino.plugin.deltalake.transactionlog.TransactionLogUtil;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointFieldReader;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeParquetFileStatistics;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveColumnProjectionInfo;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.ReaderPageSource;
import io.trino.plugin.hive.parquet.ParquetPageSource;
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.block.RowBlock;
import io.trino.spi.block.SqlRow;
import io.trino.spi.block.ValueBlock;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.DateTimeEncoding;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.TypeUtils;
import io.trino.spi.type.VarcharType;
import jakarta.annotation.Nullable;
import java.math.RoundingMode;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.function.Predicate;
import org.joda.time.DateTimeZone;

public class CheckpointEntryIterator
extends AbstractIterator<DeltaLakeTransactionLogEntry> {
    private static final Logger log = Logger.get(CheckpointEntryIterator.class);
    private final String checkpointPath;
    private final ConnectorSession session;
    private final ParquetPageSource pageSource;
    private final MapType stringMap;
    private final ArrayType stringList;
    private final Queue<DeltaLakeTransactionLogEntry> nextEntries;
    private final List<CheckpointFieldExtractor> extractors;
    private final boolean checkpointRowStatisticsWritingEnabled;
    private final TupleDomain<DeltaLakeColumnHandle> partitionConstraint;
    private final Optional<RowType> txnType;
    private final Optional<RowType> addType;
    private final Optional<RowType> addPartitionValuesType;
    private final Optional<RowType> addDeletionVectorType;
    private final Optional<RowType> addParsedStatsFieldType;
    private final Optional<RowType> removeType;
    private final Optional<RowType> metadataType;
    private final Optional<RowType> protocolType;
    private final Optional<RowType> commitType;
    private MetadataEntry metadataEntry;
    private ProtocolEntry protocolEntry;
    private boolean deletionVectorsEnabled;
    private List<DeltaLakeColumnMetadata> schema;
    private List<DeltaLakeColumnMetadata> columnsWithMinMaxStats;
    private Page page;
    private int pagePosition;

    public CheckpointEntryIterator(TrinoInputFile checkpoint, ConnectorSession session, long fileSize, CheckpointSchemaManager checkpointSchemaManager, TypeManager typeManager, Set<EntryType> fields, Optional<MetadataEntry> metadataEntry, Optional<ProtocolEntry> protocolEntry, FileFormatDataSourceStats stats, ParquetReaderOptions parquetReaderOptions, boolean checkpointRowStatisticsWritingEnabled, int domainCompactionThreshold, TupleDomain<DeltaLakeColumnHandle> partitionConstraint, Optional<Predicate<String>> addStatsMinMaxColumnFilter) {
        this.checkpointPath = checkpoint.location().toString();
        this.session = Objects.requireNonNull(session, "session is null");
        this.stringList = (ArrayType)typeManager.getType(TypeSignature.arrayType((TypeSignature)VarcharType.VARCHAR.getTypeSignature()));
        this.stringMap = (MapType)typeManager.getType(TypeSignature.mapType((TypeSignature)VarcharType.VARCHAR.getTypeSignature(), (TypeSignature)VarcharType.VARCHAR.getTypeSignature()));
        this.checkpointRowStatisticsWritingEnabled = checkpointRowStatisticsWritingEnabled;
        this.partitionConstraint = Objects.requireNonNull(partitionConstraint, "partitionConstraint is null");
        Objects.requireNonNull(addStatsMinMaxColumnFilter, "addStatsMinMaxColumnFilter is null");
        Preconditions.checkArgument((!fields.isEmpty() ? 1 : 0) != 0, (Object)"fields is empty");
        if (fields.contains((Object)EntryType.ADD)) {
            Preconditions.checkArgument((boolean)metadataEntry.isPresent(), (Object)"Metadata entry must be provided when reading ADD entries from Checkpoint files");
            this.metadataEntry = metadataEntry.get();
            Preconditions.checkArgument((boolean)protocolEntry.isPresent(), (Object)"Protocol entry must be provided when reading ADD entries from Checkpoint files");
            this.protocolEntry = protocolEntry.get();
            this.deletionVectorsEnabled = DeltaLakeSchemaSupport.isDeletionVectorEnabled(this.metadataEntry, this.protocolEntry);
            Preconditions.checkArgument((boolean)addStatsMinMaxColumnFilter.isPresent(), (Object)"addStatsMinMaxColumnFilter must be provided when reading ADD entries from Checkpoint files");
            this.schema = DeltaLakeSchemaSupport.extractSchema(this.metadataEntry, this.protocolEntry, typeManager);
            this.columnsWithMinMaxStats = TransactionLogAccess.columnsWithStats(this.schema, this.metadataEntry.getOriginalPartitionColumns());
            Predicate<String> columnStatsFilterFunction = addStatsMinMaxColumnFilter.orElseThrow();
            this.columnsWithMinMaxStats = (List)this.columnsWithMinMaxStats.stream().filter(column -> columnStatsFilterFunction.test(column.getName())).collect(ImmutableList.toImmutableList());
        }
        ImmutableList.Builder columnsBuilder = ImmutableList.builderWithExpectedSize((int)fields.size());
        ImmutableList.Builder disjunctDomainsBuilder = ImmutableList.builderWithExpectedSize((int)fields.size());
        for (EntryType field : fields) {
            HiveColumnHandle column2 = this.buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry, addStatsMinMaxColumnFilter).toHiveColumnHandle();
            columnsBuilder.add((Object)column2);
            disjunctDomainsBuilder.add(this.buildTupleDomainColumnHandle(field, column2));
            if (field != EntryType.ADD) continue;
            RowType addEntryPartitionValuesType = checkpointSchemaManager.getAddEntryPartitionValuesType();
            columnsBuilder.add((Object)new DeltaLakeColumnHandle("add", (Type)addEntryPartitionValuesType, OptionalInt.empty(), "add", (Type)addEntryPartitionValuesType, DeltaLakeColumnType.REGULAR, Optional.empty()).toHiveColumnHandle());
        }
        ReaderPageSource pageSource = ParquetPageSourceFactory.createPageSource((TrinoInputFile)checkpoint, (long)0L, (long)fileSize, (List)columnsBuilder.build(), (List)disjunctDomainsBuilder.build(), (boolean)true, (DateTimeZone)DateTimeZone.UTC, (FileFormatDataSourceStats)stats, (ParquetReaderOptions)parquetReaderOptions, Optional.empty(), (int)domainCompactionThreshold, (OptionalLong)OptionalLong.empty());
        this.pageSource = (ParquetPageSource)pageSource.get();
        try {
            Verify.verify((boolean)pageSource.getReaderColumns().isEmpty(), (String)"All columns expected to be base columns", (Object[])new Object[0]);
            this.nextEntries = new ArrayDeque<DeltaLakeTransactionLogEntry>();
            this.extractors = (List)fields.stream().map(this::createCheckpointFieldExtractor).collect(ImmutableList.toImmutableList());
            this.txnType = this.getParquetType(fields, EntryType.TRANSACTION);
            this.addType = this.getAddParquetTypeContainingField(fields, "path");
            this.addPartitionValuesType = this.getAddParquetTypeContainingField(fields, "partitionValues");
            this.addDeletionVectorType = this.addType.flatMap(type -> CheckpointEntryIterator.getOptionalFieldType(type, "deletionVector"));
            this.addParsedStatsFieldType = this.addType.flatMap(type -> CheckpointEntryIterator.getOptionalFieldType(type, "stats_parsed"));
            this.removeType = this.getParquetType(fields, EntryType.REMOVE);
            this.metadataType = this.getParquetType(fields, EntryType.METADATA);
            this.protocolType = this.getParquetType(fields, EntryType.PROTOCOL);
            this.commitType = this.getParquetType(fields, EntryType.COMMIT);
        }
        catch (Exception e) {
            try {
                this.pageSource.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Error while initilizing the checkpoint entry iterator for the file %s".formatted(checkpoint.location()));
        }
    }

    private static Optional<RowType> getOptionalFieldType(RowType type, String fieldName) {
        return ((Optional)type.getFields().stream().filter(field -> ((String)field.getName().orElseThrow()).equals(fieldName)).collect(MoreCollectors.toOptional())).map(RowType.Field::getType).map(RowType.class::cast);
    }

    private Optional<RowType> getAddParquetTypeContainingField(Set<EntryType> fields, String fieldName) {
        return fields.contains((Object)EntryType.ADD) ? ((Optional)this.pageSource.getColumnFields().stream().filter(column -> {
            if (!column.name().equals(EntryType.ADD.getColumnName())) return false;
            Type patt0$temp = column.field().getType();
            if (!(patt0$temp instanceof RowType)) return false;
            RowType rowType = (RowType)patt0$temp;
            if (!rowType.getFields().stream().map(RowType.Field::getName).filter(Optional::isPresent).flatMap(Optional::stream).anyMatch(fieldName::equals)) return false;
            return true;
        }).collect(MoreCollectors.toOptional())).map(Column::field).map(Field::getType).map(RowType.class::cast) : Optional.empty();
    }

    private Optional<RowType> getParquetType(Set<EntryType> fields, EntryType field) {
        return fields.contains((Object)field) ? this.getParquetType(field.getColumnName()).map(RowType.class::cast) : Optional.empty();
    }

    private Optional<Type> getParquetType(String columnName) {
        return ((Optional)this.pageSource.getColumnFields().stream().filter(column -> column.name().equals(columnName)).collect(MoreCollectors.toOptional())).map(Column::field).map(Field::getType);
    }

    private CheckpointFieldExtractor createCheckpointFieldExtractor(EntryType entryType) {
        return switch (entryType.ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> (session, pagePosition, blocks) -> this.buildTxnEntry(session, pagePosition, blocks[0]);
            case 1 -> new AddFileEntryExtractor();
            case 2 -> (session, pagePosition, blocks) -> this.buildRemoveEntry(session, pagePosition, blocks[0]);
            case 3 -> (session, pagePosition, blocks) -> this.buildMetadataEntry(session, pagePosition, blocks[0]);
            case 4 -> (session, pagePosition, blocks) -> this.buildProtocolEntry(session, pagePosition, blocks[0]);
            case 5 -> (session, pagePosition, blocks) -> this.buildCommitInfoEntry(session, pagePosition, blocks[0]);
        };
    }

    private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointSchemaManager schemaManager, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, Optional<Predicate<String>> addStatsMinMaxColumnFilter) {
        RowType type = switch (entryType.ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> schemaManager.getTxnEntryType();
            case 1 -> schemaManager.getAddEntryType(metadataEntry, protocolEntry, addStatsMinMaxColumnFilter.orElseThrow(), true, true, false);
            case 2 -> schemaManager.getRemoveEntryType();
            case 3 -> schemaManager.getMetadataEntryType();
            case 4 -> schemaManager.getProtocolEntryType(true, true);
            case 5 -> schemaManager.getCommitInfoEntryType();
        };
        return new DeltaLakeColumnHandle(entryType.getColumnName(), (Type)type, OptionalInt.empty(), entryType.getColumnName(), (Type)type, DeltaLakeColumnType.REGULAR, Optional.empty());
    }

    private TupleDomain<HiveColumnHandle> buildTupleDomainColumnHandle(EntryType entryType, HiveColumnHandle column) {
        String field;
        BigintType type = switch (entryType.ordinal()) {
            case 0, 5 -> {
                field = "version";
                yield BigintType.BIGINT;
            }
            case 1, 2 -> {
                field = "path";
                yield VarcharType.VARCHAR;
            }
            case 3 -> {
                field = "id";
                yield VarcharType.VARCHAR;
            }
            case 4 -> {
                field = "minReaderVersion";
                yield BigintType.BIGINT;
            }
            default -> throw new IllegalArgumentException("Unsupported Delta Lake checkpoint entry type: " + String.valueOf((Object)entryType));
        };
        HiveColumnHandle handle = new HiveColumnHandle(column.getBaseColumnName(), column.getBaseHiveColumnIndex(), column.getBaseHiveType(), column.getBaseType(), Optional.of(new HiveColumnProjectionInfo((List)ImmutableList.of((Object)0), (List)ImmutableList.of((Object)field), HiveType.toHiveType((Type)type), (Type)type)), HiveColumnHandle.ColumnType.REGULAR, column.getComment());
        ImmutableMap.Builder domains = ImmutableMap.builder().put((Object)handle, (Object)Domain.notNull((Type)handle.getType()));
        if (entryType == EntryType.ADD) {
            ((Map)this.partitionConstraint.getDomains().orElseThrow()).forEach((key, value) -> domains.put((Object)CheckpointEntryIterator.toPartitionValuesParsedField(column, key), value));
        }
        return TupleDomain.withColumnDomains((Map)domains.buildOrThrow());
    }

    private static HiveColumnHandle toPartitionValuesParsedField(HiveColumnHandle addColumn, DeltaLakeColumnHandle partitionColumn) {
        return new HiveColumnHandle(addColumn.getBaseColumnName(), addColumn.getBaseHiveColumnIndex(), addColumn.getBaseHiveType(), addColumn.getBaseType(), Optional.of(new HiveColumnProjectionInfo((List)ImmutableList.of((Object)0, (Object)0), (List)ImmutableList.of((Object)"partitionvalues_parsed", (Object)partitionColumn.getColumnName()), DeltaHiveTypeTranslator.toHiveType(partitionColumn.getType()), partitionColumn.getType())), HiveColumnHandle.ColumnType.REGULAR, addColumn.getComment());
    }

    private DeltaLakeTransactionLogEntry buildCommitInfoEntry(ConnectorSession session, int pagePosition, Block block) {
        log.debug("Building commitInfo entry from %s pagePosition %d", new Object[]{block, pagePosition});
        if (block.isNull(pagePosition)) {
            return null;
        }
        RowType type = this.commitType.orElseThrow();
        int commitInfoFields = 12;
        int jobFields = 5;
        int notebookFields = 1;
        SqlRow commitInfoRow = CheckpointEntryIterator.getRow(block, pagePosition);
        CheckpointFieldReader commitInfo = new CheckpointFieldReader(session, commitInfoRow, type);
        log.debug("Block %s has %s fields", new Object[]{block, commitInfoRow.getFieldCount()});
        if (commitInfoRow.getFieldCount() != commitInfoFields) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, String.format("Expected block %s to have %d children, but found %s", block, commitInfoFields, commitInfoRow.getFieldCount()));
        }
        SqlRow jobRow = commitInfo.getRow("job");
        if (jobRow.getFieldCount() != jobFields) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, String.format("Expected block %s to have %d children, but found %s", jobRow, jobFields, jobRow.getFieldCount()));
        }
        RowType.Field jobField = (RowType.Field)type.getFields().stream().filter(field -> ((String)field.getName().orElseThrow()).equals("job")).collect(MoreCollectors.onlyElement());
        CheckpointFieldReader job = new CheckpointFieldReader(session, jobRow, (RowType)jobField.getType());
        SqlRow notebookRow = commitInfo.getRow("notebook");
        if (notebookRow.getFieldCount() != notebookFields) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, String.format("Expected block %s to have %d children, but found %s", notebookRow, notebookFields, notebookRow.getFieldCount()));
        }
        RowType.Field notebookField = (RowType.Field)type.getFields().stream().filter(field -> ((String)field.getName().orElseThrow()).equals("notebook")).collect(MoreCollectors.onlyElement());
        CheckpointFieldReader notebook = new CheckpointFieldReader(session, notebookRow, (RowType)notebookField.getType());
        CommitInfoEntry result = new CommitInfoEntry(commitInfo.getLong("version"), commitInfo.getLong("timestamp"), commitInfo.getString("userId"), commitInfo.getString("userName"), commitInfo.getString("operation"), commitInfo.getMap(this.stringMap, "operationParameters"), new CommitInfoEntry.Job(job.getString("jobId"), job.getString("jobName"), job.getString("runId"), job.getString("jobOwnerId"), job.getString("triggerType")), new CommitInfoEntry.Notebook(notebook.getString("notebookId")), commitInfo.getString("clusterId"), commitInfo.getInt("readVersion"), commitInfo.getString("isolationLevel"), Optional.of(commitInfo.getBoolean("isBlindAppend")));
        log.debug("Result: %s", new Object[]{result});
        return DeltaLakeTransactionLogEntry.commitInfoEntry(result);
    }

    private DeltaLakeTransactionLogEntry buildProtocolEntry(ConnectorSession session, int pagePosition, Block block) {
        log.debug("Building protocol entry from %s pagePosition %d", new Object[]{block, pagePosition});
        if (block.isNull(pagePosition)) {
            return null;
        }
        RowType type = this.protocolType.orElseThrow();
        int minProtocolFields = 2;
        int maxProtocolFields = 4;
        SqlRow protocolEntryRow = CheckpointEntryIterator.getRow(block, pagePosition);
        int fieldCount = protocolEntryRow.getFieldCount();
        log.debug("Block %s has %s fields", new Object[]{block, fieldCount});
        if (fieldCount < minProtocolFields || fieldCount > maxProtocolFields) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, String.format("Expected block %s to have between %d and %d children, but found %s", block, minProtocolFields, maxProtocolFields, fieldCount));
        }
        CheckpointFieldReader protocol = new CheckpointFieldReader(session, protocolEntryRow, type);
        ProtocolEntry result = new ProtocolEntry(protocol.getInt("minReaderVersion"), protocol.getInt("minWriterVersion"), protocol.getOptionalSet(this.stringList, "readerFeatures"), protocol.getOptionalSet(this.stringList, "writerFeatures"));
        log.debug("Result: %s", new Object[]{result});
        return DeltaLakeTransactionLogEntry.protocolEntry(result);
    }

    private DeltaLakeTransactionLogEntry buildMetadataEntry(ConnectorSession session, int pagePosition, Block block) {
        log.debug("Building metadata entry from %s pagePosition %d", new Object[]{block, pagePosition});
        if (block.isNull(pagePosition)) {
            return null;
        }
        RowType type = this.metadataType.orElseThrow();
        int metadataFields = 8;
        int formatFields = 2;
        SqlRow metadataEntryRow = CheckpointEntryIterator.getRow(block, pagePosition);
        CheckpointFieldReader metadata = new CheckpointFieldReader(session, metadataEntryRow, type);
        log.debug("Block %s has %s fields", new Object[]{block, metadataEntryRow.getFieldCount()});
        if (metadataEntryRow.getFieldCount() != metadataFields) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, String.format("Expected block %s to have %d children, but found %s", block, metadataFields, metadataEntryRow.getFieldCount()));
        }
        SqlRow formatRow = metadata.getRow("format");
        if (formatRow.getFieldCount() != formatFields) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, String.format("Expected block %s to have %d children, but found %s", formatRow, formatFields, formatRow.getFieldCount()));
        }
        RowType.Field formatField = (RowType.Field)type.getFields().stream().filter(field -> ((String)field.getName().orElseThrow()).equals("format")).collect(MoreCollectors.onlyElement());
        CheckpointFieldReader format = new CheckpointFieldReader(session, formatRow, (RowType)formatField.getType());
        MetadataEntry result = new MetadataEntry(metadata.getString("id"), metadata.getString("name"), metadata.getString("description"), new MetadataEntry.Format(format.getString("provider"), format.getMap(this.stringMap, "options")), metadata.getString("schemaString"), metadata.getList(this.stringList, "partitionColumns"), metadata.getMap(this.stringMap, "configuration"), metadata.getLong("createdTime"));
        log.debug("Result: %s", new Object[]{result});
        return DeltaLakeTransactionLogEntry.metadataEntry(result);
    }

    private DeltaLakeTransactionLogEntry buildRemoveEntry(ConnectorSession session, int pagePosition, Block block) {
        log.debug("Building remove entry from %s pagePosition %d", new Object[]{block, pagePosition});
        if (block.isNull(pagePosition)) {
            return null;
        }
        RowType type = this.removeType.orElseThrow();
        int removeFields = 4;
        SqlRow removeEntryRow = CheckpointEntryIterator.getRow(block, pagePosition);
        log.debug("Block %s has %s fields", new Object[]{block, removeEntryRow.getFieldCount()});
        if (removeEntryRow.getFieldCount() != removeFields) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, String.format("Expected block %s to have %d children, but found %s", block, removeFields, removeEntryRow.getFieldCount()));
        }
        CheckpointFieldReader remove = new CheckpointFieldReader(session, removeEntryRow, type);
        RemoveFileEntry result = new RemoveFileEntry(remove.getString("path"), remove.getMap(this.stringMap, "partitionValues"), remove.getLong("deletionTimestamp"), remove.getBoolean("dataChange"));
        log.debug("Result: %s", new Object[]{result});
        return DeltaLakeTransactionLogEntry.removeFileEntry(result);
    }

    private DeletionVectorEntry parseDeletionVectorFromParquet(ConnectorSession session, SqlRow row, RowType type) {
        Preconditions.checkArgument((row.getFieldCount() == 5 ? 1 : 0) != 0, (Object)"Deletion vector entry must have 5 fields");
        CheckpointFieldReader deletionVector = new CheckpointFieldReader(session, row, type);
        String storageType = deletionVector.getString("storageType");
        String pathOrInlineDv = deletionVector.getString("pathOrInlineDv");
        OptionalInt offset = deletionVector.getOptionalInt("offset");
        int sizeInBytes = deletionVector.getInt("sizeInBytes");
        long cardinality = deletionVector.getLong("cardinality");
        return new DeletionVectorEntry(storageType, pathOrInlineDv, offset, sizeInBytes, cardinality);
    }

    private DeltaLakeParquetFileStatistics parseStatisticsFromParquet(ConnectorSession session, SqlRow statsRow, RowType type) {
        CheckpointFieldReader stats = new CheckpointFieldReader(session, statsRow, type);
        long numRecords = stats.getLong("numRecords");
        Optional<Map<String, Object>> minValues = Optional.empty();
        Optional<Map<String, Object>> maxValues = Optional.empty();
        if (!this.columnsWithMinMaxStats.isEmpty()) {
            minValues = Optional.of(this.parseMinMax(stats.getRow("minValues"), this.columnsWithMinMaxStats));
            maxValues = Optional.of(this.parseMinMax(stats.getRow("maxValues"), this.columnsWithMinMaxStats));
        }
        Optional<Map<String, Object>> nullCount = Optional.of(this.parseNullCount(stats.getRow("nullCount"), this.schema));
        return new DeltaLakeParquetFileStatistics(Optional.of(numRecords), minValues, maxValues, nullCount);
    }

    private ImmutableMap<String, Object> parseMinMax(@Nullable SqlRow row, List<DeltaLakeColumnMetadata> eligibleColumns) {
        if (row == null) {
            return ImmutableMap.of();
        }
        ImmutableMap.Builder values = ImmutableMap.builder();
        for (int i = 0; i < eligibleColumns.size(); ++i) {
            int fieldIndex;
            DeltaLakeColumnMetadata metadata = eligibleColumns.get(i);
            String name = metadata.getPhysicalName();
            Type type = metadata.getPhysicalColumnType();
            ValueBlock fieldBlock = row.getUnderlyingFieldBlock(i);
            if (fieldBlock.isNull(fieldIndex = row.getUnderlyingFieldPosition(i))) continue;
            if (type instanceof RowType) {
                RowType rowType = (RowType)type;
                if (!this.checkpointRowStatisticsWritingEnabled) continue;
                values.put((Object)name, (Object)rowType.getObject((Block)fieldBlock, fieldIndex));
                continue;
            }
            if (type instanceof TimestampWithTimeZoneType) {
                long epochMillis = LongMath.divide((long)((Long)TypeUtils.readNativeValue((Type)TimestampType.TIMESTAMP_MILLIS, (Block)fieldBlock, (int)fieldIndex)), (long)1000L, (RoundingMode)RoundingMode.UNNECESSARY);
                if (Math.floorDiv(epochMillis, 86400000) < TransactionLogParser.START_OF_MODERN_ERA_EPOCH_DAY) continue;
                values.put((Object)name, (Object)DateTimeEncoding.packDateTimeWithZone((long)epochMillis, (TimeZoneKey)TimeZoneKey.UTC_KEY));
                continue;
            }
            values.put((Object)name, TypeUtils.readNativeValue((Type)type, (Block)fieldBlock, (int)fieldIndex));
        }
        return values.buildOrThrow();
    }

    private Map<String, Object> parseNullCount(SqlRow row, List<DeltaLakeColumnMetadata> columns) {
        if (row == null) {
            return ImmutableMap.of();
        }
        ImmutableMap.Builder values = ImmutableMap.builder();
        for (int i = 0; i < columns.size(); ++i) {
            int fieldIndex;
            DeltaLakeColumnMetadata metadata = columns.get(i);
            ValueBlock fieldBlock = row.getUnderlyingFieldBlock(i);
            if (fieldBlock.isNull(fieldIndex = row.getUnderlyingFieldPosition(i))) continue;
            if (metadata.getType() instanceof RowType) {
                if (!this.checkpointRowStatisticsWritingEnabled) continue;
                values.put((Object)metadata.getPhysicalName(), (Object)CheckpointEntryIterator.getRow((Block)fieldBlock, fieldIndex));
                continue;
            }
            values.put((Object)metadata.getPhysicalName(), (Object)CheckpointEntryIterator.getLongField(row, i));
        }
        return values.buildOrThrow();
    }

    private DeltaLakeTransactionLogEntry buildTxnEntry(ConnectorSession session, int pagePosition, Block block) {
        log.debug("Building txn entry from %s pagePosition %d", new Object[]{block, pagePosition});
        if (block.isNull(pagePosition)) {
            return null;
        }
        RowType type = this.txnType.orElseThrow();
        int txnFields = 3;
        SqlRow txnEntryRow = CheckpointEntryIterator.getRow(block, pagePosition);
        log.debug("Block %s has %s fields", new Object[]{block, txnEntryRow.getFieldCount()});
        if (txnEntryRow.getFieldCount() != txnFields) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, String.format("Expected block %s to have %d children, but found %s", block, txnFields, txnEntryRow.getFieldCount()));
        }
        CheckpointFieldReader txn = new CheckpointFieldReader(session, txnEntryRow, type);
        TransactionEntry result = new TransactionEntry(txn.getString("appId"), txn.getLong("version"), txn.getLong("lastUpdated"));
        log.debug("Result: %s", new Object[]{result});
        return DeltaLakeTransactionLogEntry.transactionEntry(result);
    }

    private static long getLongField(SqlRow row, int field) {
        LongArrayBlock valueBlock = (LongArrayBlock)row.getUnderlyingFieldBlock(field);
        return valueBlock.getLong(row.getUnderlyingFieldPosition(field));
    }

    protected DeltaLakeTransactionLogEntry computeNext() {
        if (this.nextEntries.isEmpty()) {
            this.fillNextEntries();
        }
        if (!this.nextEntries.isEmpty()) {
            return this.nextEntries.remove();
        }
        this.pageSource.close();
        return (DeltaLakeTransactionLogEntry)this.endOfData();
    }

    private boolean tryAdvancePage() {
        if (this.pageSource.isFinished()) {
            this.pageSource.close();
            return false;
        }
        boolean isFirstPage = this.page == null;
        this.page = this.pageSource.getNextPage();
        if (this.page == null) {
            return false;
        }
        if (isFirstPage) {
            int requiredExtractorChannels = this.extractors.stream().mapToInt(CheckpointFieldExtractor::getRequiredChannels).sum();
            if (this.page.getChannelCount() != requiredExtractorChannels) {
                throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, String.format("Expected page in %s to contain %d channels, but found %d", this.checkpointPath, requiredExtractorChannels, this.page.getChannelCount()));
            }
        }
        this.pagePosition = 0;
        return true;
    }

    public void close() {
        this.pageSource.close();
    }

    private void fillNextEntries() {
        while (this.nextEntries.isEmpty()) {
            while (this.page == null || this.pagePosition == this.page.getPositionCount()) {
                if (this.tryAdvancePage()) continue;
                return;
            }
            int blockIndex = 0;
            for (CheckpointFieldExtractor extractor : this.extractors) {
                DeltaLakeTransactionLogEntry entry;
                if (extractor instanceof AddFileEntryExtractor) {
                    Block addBlock = this.page.getBlock(blockIndex);
                    Block addPartitionValuesBlock = this.page.getBlock(blockIndex + 1);
                    entry = extractor.getEntry(this.session, this.pagePosition, addBlock, addPartitionValuesBlock.getLoadedBlock());
                } else {
                    entry = extractor.getEntry(this.session, this.pagePosition, this.page.getBlock(blockIndex).getLoadedBlock());
                }
                if (entry != null) {
                    this.nextEntries.add(entry);
                }
                blockIndex += extractor.getRequiredChannels();
            }
            ++this.pagePosition;
        }
    }

    @VisibleForTesting
    OptionalLong getCompletedPositions() {
        return this.pageSource.getCompletedPositions();
    }

    @VisibleForTesting
    long getCompletedBytes() {
        return this.pageSource.getCompletedBytes();
    }

    private static SqlRow getRow(Block block, int position) {
        return ((RowBlock)block.getUnderlyingValueBlock()).getRow(block.getUnderlyingValuePosition(position));
    }

    public static enum EntryType {
        TRANSACTION("txn"),
        ADD("add"),
        REMOVE("remove"),
        METADATA("metadata"),
        PROTOCOL("protocol"),
        COMMIT("commitinfo");

        private final String columnName;

        private EntryType(String columnName) {
            this.columnName = columnName;
        }

        public String getColumnName() {
            return this.columnName;
        }
    }

    @FunctionalInterface
    private static interface CheckpointFieldExtractor {
        @Nullable
        public DeltaLakeTransactionLogEntry getEntry(ConnectorSession var1, int var2, Block ... var3);

        default public int getRequiredChannels() {
            return 1;
        }
    }

    private class AddFileEntryExtractor
    implements CheckpointFieldExtractor {
        private AddFileEntryExtractor() {
        }

        @Override
        @Nullable
        public DeltaLakeTransactionLogEntry getEntry(ConnectorSession session, int pagePosition, Block ... blocks) {
            Preconditions.checkState((blocks.length == this.getRequiredChannels() ? 1 : 0) != 0, (String)"Unexpected amount of blocks: %s", (int)blocks.length);
            Block addBlock = blocks[0];
            Block addPartitionValuesBlock = blocks[1];
            log.debug("Building add entry from %s pagePosition %d", new Object[]{addBlock, pagePosition});
            if (addBlock.isNull(pagePosition)) {
                return null;
            }
            Preconditions.checkState((!addPartitionValuesBlock.isNull(pagePosition) ? 1 : 0) != 0, (Object)"Inconsistent blocks provided while building the add file entry");
            SqlRow addPartitionValuesRow = CheckpointEntryIterator.getRow(addPartitionValuesBlock, pagePosition);
            CheckpointFieldReader addPartitionValuesReader = new CheckpointFieldReader(session, addPartitionValuesRow, CheckpointEntryIterator.this.addPartitionValuesType.orElseThrow());
            Map<String, String> partitionValues = addPartitionValuesReader.getMap(CheckpointEntryIterator.this.stringMap, "partitionValues");
            Map<String, Optional<String>> canonicalPartitionValues = TransactionLogUtil.canonicalizePartitionValues(partitionValues);
            if (!CheckpointEntryIterator.this.partitionConstraint.isAll() && !DeltaLakeSplitManager.partitionMatchesPredicate(canonicalPartitionValues, (Map)CheckpointEntryIterator.this.partitionConstraint.getDomains().orElseThrow())) {
                return null;
            }
            addBlock = addBlock.getLoadedBlock();
            SqlRow addEntryRow = CheckpointEntryIterator.getRow(addBlock, pagePosition);
            log.debug("Block %s has %s fields", new Object[]{addBlock, addEntryRow.getFieldCount()});
            CheckpointFieldReader addReader = new CheckpointFieldReader(session, addEntryRow, CheckpointEntryIterator.this.addType.orElseThrow());
            String path = addReader.getString("path");
            long size = addReader.getLong("size");
            long modificationTime = addReader.getLong("modificationTime");
            boolean dataChange = addReader.getBoolean("dataChange");
            Optional<DeletionVectorEntry> deletionVector = Optional.empty();
            if (CheckpointEntryIterator.this.deletionVectorsEnabled) {
                deletionVector = Optional.ofNullable(addReader.getRow("deletionVector")).map(row -> CheckpointEntryIterator.this.parseDeletionVectorFromParquet(session, (SqlRow)row, CheckpointEntryIterator.this.addDeletionVectorType.orElseThrow()));
            }
            Optional<DeltaLakeParquetFileStatistics> parsedStats = Optional.ofNullable(addReader.getRow("stats_parsed")).map(row -> CheckpointEntryIterator.this.parseStatisticsFromParquet(session, (SqlRow)row, CheckpointEntryIterator.this.addParsedStatsFieldType.orElseThrow()));
            Optional<String> stats = Optional.empty();
            if (parsedStats.isEmpty()) {
                stats = Optional.ofNullable(addReader.getString("stats"));
            }
            Map<String, String> tags = addReader.getMap(CheckpointEntryIterator.this.stringMap, "tags");
            AddFileEntry result = new AddFileEntry(path, partitionValues, canonicalPartitionValues, size, modificationTime, dataChange, stats, parsedStats, tags, deletionVector);
            log.debug("Result: %s", new Object[]{result});
            return DeltaLakeTransactionLogEntry.addFileEntry(result);
        }

        @Override
        public int getRequiredChannels() {
            return 2;
        }
    }
}

