/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.MoreCollectors;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import com.google.common.io.Resources;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
import io.trino.filesystem.local.LocalInputFile;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.ParquetTestUtils;
import io.trino.parquet.metadata.FileMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.ParquetReader;
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeQueryRunner;
import io.trino.plugin.deltalake.DeltaTestingConnectorSession;
import io.trino.plugin.deltalake.ResourceTable;
import io.trino.plugin.deltalake.TestingDeltaLakeUtils;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.TemporalTimeTravelUtil;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
import io.trino.plugin.hive.HiveTestUtils;
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
import io.trino.spi.block.Block;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.type.DateTimeEncoding;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.SqlDate;
import io.trino.spi.type.SqlDecimal;
import io.trino.spi.type.SqlTimestamp;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TypeManager;
import io.trino.sql.query.QueryAssertions;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingNames;
import io.trino.testing.TestingSession;
import io.trino.testing.sql.TestTable;
import io.trino.type.InternalTypeManager;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.PrimitiveType;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AssertProvider;
import org.assertj.core.api.Assertions;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Execution(value=ExecutionMode.SAME_THREAD)
public class TestDeltaLakeBasic
extends AbstractTestQueryFramework {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get();
    private static final List<ResourceTable> PERSON_TABLES = ImmutableList.of((Object)new ResourceTable("person", "databricks73/person"), (Object)new ResourceTable("person_without_last_checkpoint", "databricks73/person_without_last_checkpoint"), (Object)new ResourceTable("person_without_old_jsons", "databricks73/person_without_old_jsons"), (Object)new ResourceTable("person_without_checkpoints", "databricks73/person_without_checkpoints"));
    private static final List<ResourceTable> OTHER_TABLES = ImmutableList.of((Object)new ResourceTable("allow_column_defaults", "deltalake/allow_column_defaults"), (Object)new ResourceTable("stats_with_minmax_nulls", "deltalake/stats_with_minmax_nulls"), (Object)new ResourceTable("no_column_stats", "databricks73/no_column_stats"), (Object)new ResourceTable("liquid_clustering", "deltalake/liquid_clustering"), (Object)new ResourceTable("region_91_lts", "databricks91/region"), (Object)new ResourceTable("region_104_lts", "databricks104/region"), (Object)new ResourceTable("timestamp_ntz", "databricks131/timestamp_ntz"), (Object)new ResourceTable("timestamp_ntz_partition", "databricks131/timestamp_ntz_partition"), (Object)new ResourceTable("uniform_hudi", "deltalake/uniform_hudi"), (Object)new ResourceTable("uniform_iceberg_v1", "databricks133/uniform_iceberg_v1"), (Object)new ResourceTable("uniform_iceberg_v2", "databricks143/uniform_iceberg_v2"), (Object)new ResourceTable("unsupported_writer_feature", "deltalake/unsupported_writer_feature"), (Object[])new ResourceTable[]{new ResourceTable("unsupported_writer_version", "deltalake/unsupported_writer_version"), new ResourceTable("variant", "databricks153/variant"), new ResourceTable("variant_types", "databricks153/variant_types"), new ResourceTable("type_widening", "databricks153/type_widening"), new ResourceTable("type_widening_partition", "databricks153/type_widening_partition"), new ResourceTable("type_widening_nested", "databricks153/type_widening_nested"), new ResourceTable("in_commit_timestamp_history_read", "deltalake/in_commit_timestamp_history_read")});
    private static final Pattern PHYSICAL_COLUMN_NAME_PATTERN = Pattern.compile("^col-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");
    private static final TrinoFileSystem FILE_SYSTEM = new HdfsFileSystemFactory(HiveTestUtils.HDFS_ENVIRONMENT, HiveTestUtils.HDFS_FILE_SYSTEM_STATS).create(DeltaTestingConnectorSession.SESSION);
    private final ZoneId jvmZone = ZoneId.systemDefault();
    private final ZoneId vilnius = ZoneId.of("Europe/Vilnius");
    private final ZoneId kathmandu = ZoneId.of("Asia/Kathmandu");
    private Path catalogDir;

    protected QueryRunner createQueryRunner() throws Exception {
        this.catalogDir = Files.createTempDirectory("catalog-dir", new FileAttribute[0]);
        this.closeAfterClass(() -> MoreFiles.deleteRecursively((Path)this.catalogDir, (RecursiveDeleteOption[])new RecursiveDeleteOption[]{RecursiveDeleteOption.ALLOW_INSECURE}));
        return DeltaLakeQueryRunner.builder().addDeltaProperty("hive.metastore.catalog.dir", this.catalogDir.toUri().toString()).addDeltaProperty("delta.register-table-procedure.enabled", "true").addDeltaProperty("delta.enable-non-concurrent-writes", "true").addDeltaProperty("delta.transaction-log.max-cached-file-size", "0B").build();
    }

    @BeforeAll
    public void registerTables() {
        for (ResourceTable table : Iterables.concat(PERSON_TABLES, OTHER_TABLES)) {
            String dataPath = this.getResourceLocation(table.resourcePath()).toExternalForm();
            this.getQueryRunner().execute(String.format("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')", table.tableName(), dataPath));
        }
    }

    private URL getResourceLocation(String resourcePath) {
        return ((Object)((Object)this)).getClass().getClassLoader().getResource(resourcePath);
    }

    @Test
    public void testDescribeTable() {
        for (ResourceTable table : PERSON_TABLES) {
            this.assertQuery(String.format("DESCRIBE %s", table.tableName()), "VALUES ('name', 'varchar', '', ''), ('age', 'integer', '', ''), ('married', 'boolean', '', ''), ('gender', 'varchar', '', ''), ('phones', 'array(row(number varchar, label varchar))', '', ''), ('address', 'row(street varchar, city varchar, state varchar, zip varchar)', '', ''), ('income', 'double', '', '')");
        }
    }

    @Test
    public void testSimpleQueries() {
        for (ResourceTable table : PERSON_TABLES) {
            this.assertQuery(String.format("SELECT COUNT(*) FROM %s", table.tableName()), "VALUES 12");
            this.assertQuery(String.format("SELECT income FROM %s WHERE name = 'Bob'", table.tableName()), "VALUES 99000.00");
            this.assertQuery(String.format("SELECT name FROM %s WHERE name LIKE 'B%%'", table.tableName()), "VALUES ('Bob'), ('Betty')");
            this.assertQuery(String.format("SELECT DISTINCT gender FROM %s", table.tableName()), "VALUES ('M'), ('F'), (null)");
            this.assertQuery(String.format("SELECT DISTINCT age FROM %s", table.tableName()), "VALUES (21), (25), (28), (29), (30), (42)");
            this.assertQuery(String.format("SELECT name FROM %s WHERE age = 42", table.tableName()), "VALUES ('Alice'), ('Emma')");
        }
    }

    @Test
    void testDatabricks91() {
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM region_91_lts"))).skippingTypesCheck().matches("SELECT * FROM tpch.tiny.region");
    }

    @Test
    void testDatabricks104() {
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM region_104_lts"))).skippingTypesCheck().matches("SELECT * FROM tpch.tiny.region");
    }

    @Test
    public void testNoColumnStats() {
        this.assertQuery("SELECT c_str FROM no_column_stats WHERE c_int = 42", "VALUES 'foo'");
    }

    @Test
    public void testAddNestedColumnWithColumnMappingMode() throws Exception {
        this.testAddNestedColumnWithColumnMappingMode("id");
        this.testAddNestedColumnWithColumnMappingMode("name");
    }

    private void testAddNestedColumnWithColumnMappingMode(String columnMappingMode) throws Exception {
        String tableName = "test_add_column_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)("deltalake/column_mapping_mode_" + columnMappingMode)).toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE " + tableName))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('x', 'integer')");
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        this.assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN second_col row(a array(integer), b map(integer, integer), c row(field integer))");
        MetadataEntry metadata = TestDeltaLakeBasic.loadMetadataEntry(1L, tableLocation);
        Assertions.assertThat((Map)metadata.getConfiguration()).containsEntry((Object)"delta.columnMapping.maxColumnId", (Object)"6");
        JsonNode schema = OBJECT_MAPPER.readTree(metadata.getSchemaString());
        ImmutableList fields = ImmutableList.copyOf((Iterator)schema.get("fields").elements());
        Assertions.assertThat((List)fields).hasSize(2);
        JsonNode columnX = (JsonNode)fields.get(0);
        JsonNode columnY = (JsonNode)fields.get(1);
        ImmutableList rowFields = ImmutableList.copyOf((Iterator)columnY.get("type").get("fields").elements());
        Assertions.assertThat((List)rowFields).hasSize(3);
        JsonNode nestedArray = (JsonNode)rowFields.get(0);
        JsonNode nestedMap = (JsonNode)rowFields.get(1);
        JsonNode nestedRow = (JsonNode)rowFields.get(2);
        Assertions.assertThat((int)columnX.get("metadata").get("delta.columnMapping.id").asInt()).isEqualTo(1);
        Assertions.assertThat((String)columnX.get("metadata").get("delta.columnMapping.physicalName").asText()).containsPattern(PHYSICAL_COLUMN_NAME_PATTERN);
        Assertions.assertThat((int)columnY.get("metadata").get("delta.columnMapping.id").asInt()).isEqualTo(6);
        Assertions.assertThat((String)columnY.get("metadata").get("delta.columnMapping.physicalName").asText()).containsPattern(PHYSICAL_COLUMN_NAME_PATTERN);
        Assertions.assertThat((int)nestedArray.get("metadata").get("delta.columnMapping.id").asInt()).isEqualTo(2);
        Assertions.assertThat((String)nestedArray.get("metadata").get("delta.columnMapping.physicalName").asText()).containsPattern(PHYSICAL_COLUMN_NAME_PATTERN);
        Assertions.assertThat((int)nestedMap.get("metadata").get("delta.columnMapping.id").asInt()).isEqualTo(3);
        Assertions.assertThat((String)nestedMap.get("metadata").get("delta.columnMapping.physicalName").asText()).containsPattern(PHYSICAL_COLUMN_NAME_PATTERN);
        Assertions.assertThat((int)nestedRow.get("metadata").get("delta.columnMapping.id").asInt()).isEqualTo(5);
        Assertions.assertThat((String)nestedRow.get("metadata").get("delta.columnMapping.physicalName").asText()).containsPattern(PHYSICAL_COLUMN_NAME_PATTERN);
        Assertions.assertThat((int)((JsonNode)Iterators.getOnlyElement((Iterator)nestedRow.get("type").get("fields").elements())).get("metadata").get("delta.columnMapping.id").asInt()).isEqualTo(4);
        Assertions.assertThat((String)((JsonNode)Iterators.getOnlyElement((Iterator)nestedRow.get("type").get("fields").elements())).get("metadata").get("delta.columnMapping.physicalName").asText()).containsPattern(PHYSICAL_COLUMN_NAME_PATTERN);
        this.assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN third_col row(a array(integer), b map(integer, integer), c row(field integer))");
        MetadataEntry thirdMetadata = TestDeltaLakeBasic.loadMetadataEntry(2L, tableLocation);
        JsonNode latestSchema = OBJECT_MAPPER.readTree(thirdMetadata.getSchemaString());
        ImmutableList latestFields = ImmutableList.copyOf((Iterator)latestSchema.get("fields").elements());
        Assertions.assertThat((List)latestFields).hasSize(3);
        JsonNode latestColumnX = (JsonNode)latestFields.get(0);
        JsonNode latestColumnY = (JsonNode)latestFields.get(1);
        Assertions.assertThat((Iterable)latestColumnX).isEqualTo((Object)columnX);
        Assertions.assertThat((Iterable)latestColumnY).isEqualTo((Object)columnY);
        Assertions.assertThat((Map)thirdMetadata.getConfiguration()).containsEntry((Object)"delta.columnMapping.maxColumnId", (Object)"11");
        ((AbstractStringAssert)Assertions.assertThat((String)thirdMetadata.getSchemaString()).containsPattern((CharSequence)"(delta\\.columnMapping\\.id.*?){11}")).containsPattern((CharSequence)"(delta\\.columnMapping\\.physicalName.*?){11}");
    }

    @Test
    void testPartitionValuesParsedCheckpoint() throws Exception {
        for (DeltaLakeSchemaSupport.ColumnMappingMode mode : List.of(DeltaLakeSchemaSupport.ColumnMappingMode.ID, DeltaLakeSchemaSupport.ColumnMappingMode.NAME, DeltaLakeSchemaSupport.ColumnMappingMode.NONE)) {
            this.testPartitionValuesParsedCheckpoint(mode, "boolean", (List<String>)ImmutableList.of((Object)"true", (Object)"false"), (List<Object>)ImmutableList.of((Object)true, (Object)false));
            this.testPartitionValuesParsedCheckpoint(mode, "tinyint", (List<String>)ImmutableList.of((Object)"10", (Object)"20"), (List<Object>)ImmutableList.of((Object)Byte.valueOf("10"), (Object)Byte.valueOf("20")));
            this.testPartitionValuesParsedCheckpoint(mode, "smallint", (List<String>)ImmutableList.of((Object)"100", (Object)"200"), (List<Object>)ImmutableList.of((Object)100, (Object)200));
            this.testPartitionValuesParsedCheckpoint(mode, "integer", (List<String>)ImmutableList.of((Object)"1000", (Object)"2000"), (List<Object>)ImmutableList.of((Object)1000, (Object)2000));
            this.testPartitionValuesParsedCheckpoint(mode, "bigint", (List<String>)ImmutableList.of((Object)"10000", (Object)"20000"), (List<Object>)ImmutableList.of((Object)10000L, (Object)20000L));
            this.testPartitionValuesParsedCheckpoint(mode, "real", (List<String>)ImmutableList.of((Object)"REAL '1.23'", (Object)"REAL '4.56'"), (List<Object>)ImmutableList.of((Object)Float.valueOf(1.23f), (Object)Float.valueOf(4.56f)));
            this.testPartitionValuesParsedCheckpoint(mode, "double", (List<String>)ImmutableList.of((Object)"DOUBLE '12.34'", (Object)"DOUBLE '56.78'"), (List<Object>)ImmutableList.of((Object)12.34, (Object)56.78));
            this.testPartitionValuesParsedCheckpoint(mode, "decimal(3,2)", (List<String>)ImmutableList.of((Object)"0.12", (Object)"3.45"), (List<Object>)ImmutableList.of((Object)SqlDecimal.decimal((String)"0.12", (DecimalType)DecimalType.createDecimalType((int)3, (int)2)), (Object)SqlDecimal.decimal((String)"3.45", (DecimalType)DecimalType.createDecimalType((int)3, (int)2))));
            this.testPartitionValuesParsedCheckpoint(mode, "varchar", (List<String>)ImmutableList.of((Object)"'alice'", (Object)"'bob'"), (List<Object>)ImmutableList.of((Object)"alice", (Object)"bob"));
            this.testPartitionValuesParsedCheckpoint(mode, "date", (List<String>)ImmutableList.of((Object)"DATE '1970-01-01'", (Object)"DATE '9999-12-31'"), (List<Object>)ImmutableList.of((Object)new SqlDate((int)LocalDate.of(1970, 1, 1).toEpochDay()), (Object)new SqlDate((int)LocalDate.of(9999, 12, 31).toEpochDay())));
            this.testPartitionValuesParsedCheckpoint(mode, "timestamp", (List<String>)ImmutableList.of((Object)"TIMESTAMP '1970-01-01 00:00:00'", (Object)"TIMESTAMP '1970-01-02 00:00:00'"), (List<Object>)ImmutableList.of((Object)SqlTimestamp.newInstance((int)3, (long)0L, (int)0), (Object)SqlTimestamp.newInstance((int)3, (long)86400000000L, (int)0)));
            ZonedDateTime epochPlus1Day = LocalDateTime.of(1970, 1, 2, 0, 0, 0).atZone(ZoneOffset.UTC);
            long epochPlus1DayMillis = DateTimeEncoding.packDateTimeWithZone((long)epochPlus1Day.toInstant().toEpochMilli(), (TimeZoneKey)TimeZoneKey.UTC_KEY);
            this.testPartitionValuesParsedCheckpoint(mode, "timestamp with time zone", (List<String>)ImmutableList.of((Object)"TIMESTAMP '1970-01-01 00:00:00 +00:00'", (Object)"TIMESTAMP '1970-01-02 00:00:00 +00:00'"), (List<Object>)ImmutableList.of((Object)SqlTimestamp.newInstance((int)3, (long)0L, (int)0), (Object)SqlTimestamp.newInstance((int)3, (long)epochPlus1DayMillis, (int)0)));
        }
    }

    private void testPartitionValuesParsedCheckpoint(DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode, String inputType, List<String> inputValues, List<Object> expectedPartitionValuesParsed) throws Exception {
        Preconditions.checkArgument((inputValues.size() == 2 ? 1 : 0) != 0, (Object)"inputValues size must be 2");
        Preconditions.checkArgument((expectedPartitionValuesParsed.size() == 2 ? 1 : 0) != 0, (Object)"expectedPartitionValuesParsed size must be 2");
        try (TestTable table = this.newTrinoTable("test_partition_values_parsed_checkpoint", "(x int, part " + inputType + ") WITH (checkpoint_interval = 2, column_mapping_mode = '" + String.valueOf(columnMappingMode) + "', partitioned_by = ARRAY['part'])");){
            for (String inputValue : inputValues) {
                this.assertUpdate("INSERT INTO " + table.getName() + " VALUES (" + inputValues.indexOf(inputValue) + ", " + inputValue + ")", 1L);
            }
            this.assertQuery("SELECT x FROM " + table.getName() + " WHERE part = " + inputValues.getFirst(), "VALUES 0");
            Path tableLocation = Path.of(this.getTableLocation(table.getName()).replace("file://", ""), new String[0]);
            Path checkpoint = tableLocation.resolve("_delta_log/00000000000000000002.checkpoint.parquet");
            MetadataEntry metadataEntry = TestDeltaLakeBasic.loadMetadataEntry(0L, tableLocation);
            ProtocolEntry protocolEntry = TestDeltaLakeBasic.loadProtocolEntry(0L, tableLocation);
            DeltaLakeColumnHandle partitionColumn = (DeltaLakeColumnHandle)DeltaLakeSchemaSupport.extractPartitionColumns((MetadataEntry)metadataEntry, (ProtocolEntry)protocolEntry, (TypeManager)InternalTypeManager.TESTING_TYPE_MANAGER).stream().collect(MoreCollectors.onlyElement());
            String physicalColumnName = partitionColumn.basePhysicalColumnName();
            if (columnMappingMode == DeltaLakeSchemaSupport.ColumnMappingMode.ID || columnMappingMode == DeltaLakeSchemaSupport.ColumnMappingMode.NAME) {
                Assertions.assertThat((String)physicalColumnName).matches(PHYSICAL_COLUMN_NAME_PATTERN);
            } else {
                Assertions.assertThat((String)physicalColumnName).isEqualTo("part");
            }
            int partitionValuesParsedFieldPosition = 6;
            RowType addEntryType = new CheckpointSchemaManager(InternalTypeManager.TESTING_TYPE_MANAGER).getAddEntryType(metadataEntry, protocolEntry, string -> true, true, true, true);
            RowType.Field partitionValuesParsedField = (RowType.Field)addEntryType.getFields().get(partitionValuesParsedFieldPosition);
            Assertions.assertThat((String)((String)partitionValuesParsedField.getName().orElseThrow())).matches((CharSequence)"partitionValues_parsed");
            RowType partitionValuesParsedType = (RowType)partitionValuesParsedField.getType();
            Assertions.assertThat((String)((String)((RowType.Field)partitionValuesParsedType.getFields().stream().collect(MoreCollectors.onlyElement())).getName().orElseThrow())).isEqualTo(physicalColumnName);
            TrinoParquetDataSource dataSource = new TrinoParquetDataSource((TrinoInputFile)new LocalInputFile(checkpoint.toFile()), ParquetReaderOptions.defaultOptions(), new FileFormatDataSourceStats());
            ParquetMetadata parquetMetadata = MetadataReader.readFooter((ParquetDataSource)dataSource, Optional.empty());
            try (ParquetReader reader = ParquetTestUtils.createParquetReader((ParquetDataSource)dataSource, (ParquetMetadata)parquetMetadata, (List)ImmutableList.of((Object)addEntryType), List.of("add"));){
                ArrayList actual = new ArrayList();
                SourcePage page = reader.nextPage();
                while (page != null) {
                    Block block = page.getBlock(0);
                    for (int i = 0; i < block.getPositionCount(); ++i) {
                        List add = (List)addEntryType.getObjectValue(DeltaTestingConnectorSession.SESSION, block, i);
                        if (add == null) continue;
                        actual.add(((List)add.get(partitionValuesParsedFieldPosition)).stream().collect(MoreCollectors.onlyElement()));
                    }
                    page = reader.nextPage();
                }
                Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedPartitionValuesParsed);
            }
        }
    }

    @Test
    public void testOptimizeWithColumnMappingMode() throws Exception {
        this.testOptimizeWithColumnMappingMode("id");
        this.testOptimizeWithColumnMappingMode("name");
    }

    private void testOptimizeWithColumnMappingMode(String columnMappingMode) throws Exception {
        String tableName = "test_optimize_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)("deltalake/column_mapping_mode_" + columnMappingMode)).toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE " + tableName))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('x', 'integer')");
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        MetadataEntry originalMetadata = TestDeltaLakeBasic.loadMetadataEntry(0L, tableLocation);
        JsonNode schema = OBJECT_MAPPER.readTree(originalMetadata.getSchemaString());
        ImmutableList fields = ImmutableList.copyOf((Iterator)schema.get("fields").elements());
        Assertions.assertThat((List)fields).hasSize(1);
        JsonNode column = (JsonNode)fields.get(0);
        String physicalName = column.get("metadata").get("delta.columnMapping.physicalName").asText();
        int id = column.get("metadata").get("delta.columnMapping.id").asInt();
        this.assertUpdate("INSERT INTO " + tableName + " VALUES 10", 1L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES 20", 1L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES NULL", 1L);
        this.assertUpdate(Session.builder((Session)this.getQueryRunner().getDefaultSession()).setSystemProperty("task_min_writer_count", "1").build(), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
        List<DeltaLakeTransactionLogEntry> transactionLog = TestDeltaLakeBasic.getEntriesFromJson(4L, tableLocation.resolve("_delta_log").toString());
        Assertions.assertThat(transactionLog).hasSize(5);
        Assertions.assertThat((Object)transactionLog.get(0).getCommitInfo()).isNotNull();
        Assertions.assertThat((Object)transactionLog.get(1).getRemove()).isNotNull();
        Assertions.assertThat((Object)transactionLog.get(2).getRemove()).isNotNull();
        Assertions.assertThat((Object)transactionLog.get(3).getRemove()).isNotNull();
        Assertions.assertThat((Object)transactionLog.get(4).getAdd()).isNotNull();
        AddFileEntry addFileEntry = transactionLog.get(4).getAdd();
        DeltaLakeFileStatistics stats = (DeltaLakeFileStatistics)addFileEntry.getStats().orElseThrow();
        Assertions.assertThat((Map)((Map)stats.getMinValues().orElseThrow())).containsEntry((Object)physicalName, (Object)10);
        Assertions.assertThat((Map)((Map)stats.getMaxValues().orElseThrow())).containsEntry((Object)physicalName, (Object)20);
        Assertions.assertThat((Long)((Long)stats.getNullCount(physicalName).orElseThrow())).isEqualTo(1L);
        LocalInputFile inputFile = new LocalInputFile(tableLocation.resolve(addFileEntry.getPath()).toFile());
        ParquetMetadata parquetMetadata = MetadataReader.readFooter((ParquetDataSource)new TrinoParquetDataSource((TrinoInputFile)inputFile, ParquetReaderOptions.defaultOptions(), new FileFormatDataSourceStats()), Optional.empty());
        FileMetadata fileMetaData = parquetMetadata.getFileMetaData();
        PrimitiveType physicalType = ((ColumnDescriptor)Iterators.getOnlyElement(fileMetaData.getSchema().getColumns().iterator())).getPrimitiveType();
        Assertions.assertThat((String)physicalType.getName()).isEqualTo(physicalName);
        if (columnMappingMode.equals("id")) {
            Assertions.assertThat((int)physicalType.getId().intValue()).isEqualTo(id);
        } else {
            Assertions.assertThat((Object)physicalType.getId()).isNull();
        }
    }

    @Test
    public void testDropColumnWithColumnMappingMode() throws Exception {
        this.testDropColumnWithColumnMappingMode("id");
        this.testDropColumnWithColumnMappingMode("name");
    }

    private void testDropColumnWithColumnMappingMode(String columnMappingMode) throws Exception {
        String tableName = "test_add_column_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)("deltalake/column_mapping_mode_" + columnMappingMode)).toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE " + tableName))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('x', 'integer')");
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        this.assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN second_col row(a array(integer), b map(integer, integer), c row(field integer))");
        MetadataEntry metadata = TestDeltaLakeBasic.loadMetadataEntry(1L, tableLocation);
        Assertions.assertThat((Map)metadata.getConfiguration()).containsEntry((Object)"delta.columnMapping.maxColumnId", (Object)"6");
        ((AbstractStringAssert)Assertions.assertThat((String)metadata.getSchemaString()).containsPattern((CharSequence)"(delta\\.columnMapping\\.id.*?){6}")).containsPattern((CharSequence)"(delta\\.columnMapping\\.physicalName.*?){6}");
        JsonNode schema = OBJECT_MAPPER.readTree(metadata.getSchemaString());
        ImmutableList fields = ImmutableList.copyOf((Iterator)schema.get("fields").elements());
        Assertions.assertThat((List)fields).hasSize(2);
        JsonNode nestedColumn = (JsonNode)fields.get(1);
        ImmutableList rowFields = ImmutableList.copyOf((Iterator)nestedColumn.get("type").get("fields").elements());
        Assertions.assertThat((List)rowFields).hasSize(3);
        this.assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN x");
        MetadataEntry droppedMetadata = TestDeltaLakeBasic.loadMetadataEntry(2L, tableLocation);
        JsonNode droppedSchema = OBJECT_MAPPER.readTree(droppedMetadata.getSchemaString());
        ImmutableList droppedFields = ImmutableList.copyOf((Iterator)droppedSchema.get("fields").elements());
        Assertions.assertThat((List)droppedFields).hasSize(1);
        Assertions.assertThat((Iterable)((Iterable)droppedFields.get(0))).isEqualTo((Object)nestedColumn);
        Assertions.assertThat((Map)droppedMetadata.getConfiguration()).isEqualTo((Object)metadata.getConfiguration());
        ((AbstractStringAssert)Assertions.assertThat((String)droppedMetadata.getSchemaString()).containsPattern((CharSequence)"(delta\\.columnMapping\\.id.*?){5}")).containsPattern((CharSequence)"(delta\\.columnMapping\\.physicalName.*?){5}");
    }

    @Test
    public void testRenameColumnWithColumnMappingMode() throws Exception {
        this.testRenameColumnWithColumnMappingMode("id");
        this.testRenameColumnWithColumnMappingMode("name");
    }

    private void testRenameColumnWithColumnMappingMode(String columnMappingMode) throws Exception {
        String tableName = "test_rename_column_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)("deltalake/column_mapping_mode_" + columnMappingMode)).toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        this.assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN second_col row(a array(integer), b map(integer, integer), c row(field integer))");
        MetadataEntry metadata = TestDeltaLakeBasic.loadMetadataEntry(1L, tableLocation);
        Assertions.assertThat((Map)metadata.getConfiguration()).containsEntry((Object)"delta.columnMapping.maxColumnId", (Object)"6");
        ((AbstractStringAssert)Assertions.assertThat((String)metadata.getSchemaString()).containsPattern((CharSequence)"(delta\\.columnMapping\\.id.*?){6}")).containsPattern((CharSequence)"(delta\\.columnMapping\\.physicalName.*?){6}");
        JsonNode schema = OBJECT_MAPPER.readTree(metadata.getSchemaString());
        ImmutableList fields = ImmutableList.copyOf((Iterator)schema.get("fields").elements());
        Assertions.assertThat((List)fields).hasSize(2);
        JsonNode integerColumn = (JsonNode)fields.get(0);
        JsonNode nestedColumn = (JsonNode)fields.get(1);
        ImmutableList rowFields = ImmutableList.copyOf((Iterator)nestedColumn.get("type").get("fields").elements());
        Assertions.assertThat((List)rowFields).hasSize(3);
        this.assertUpdate("ALTER TABLE " + tableName + " RENAME COLUMN second_col TO renamed_col");
        MetadataEntry renamedMetadata = TestDeltaLakeBasic.loadMetadataEntry(2L, tableLocation);
        JsonNode renamedSchema = OBJECT_MAPPER.readTree(renamedMetadata.getSchemaString());
        ImmutableList renamedFields = ImmutableList.copyOf((Iterator)renamedSchema.get("fields").elements());
        Assertions.assertThat((List)renamedFields).hasSize(2);
        Assertions.assertThat((Iterable)((Iterable)renamedFields.get(0))).isEqualTo((Object)integerColumn);
        Assertions.assertThat((Iterable)((Iterable)renamedFields.get(1))).isNotEqualTo((Object)nestedColumn);
        ObjectNode renamedColumn = ((ObjectNode)nestedColumn).put("name", "renamed_col");
        Assertions.assertThat((Iterable)((Iterable)renamedFields.get(1))).isEqualTo((Object)renamedColumn);
        Assertions.assertThat((Map)renamedMetadata.getConfiguration()).isEqualTo((Object)metadata.getConfiguration());
        ((AbstractStringAssert)Assertions.assertThat((String)renamedMetadata.getSchemaString()).containsPattern((CharSequence)"(delta\\.columnMapping\\.id.*?){6}")).containsPattern((CharSequence)"(delta\\.columnMapping\\.physicalName.*?){6}");
    }

    @Test
    public void testWriterAfterRenameColumnWithColumnMappingMode() throws Exception {
        this.testWriterAfterRenameColumnWithColumnMappingMode("id");
        this.testWriterAfterRenameColumnWithColumnMappingMode("name");
    }

    private void testWriterAfterRenameColumnWithColumnMappingMode(String columnMappingMode) throws Exception {
        String tableName = "test_writer_after_rename_column_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)("deltalake/column_mapping_mode_" + columnMappingMode)).toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES 1", 1L);
        this.assertUpdate("ALTER TABLE " + tableName + " RENAME COLUMN x to new_x");
        this.assertQuery("SELECT * FROM " + tableName, "VALUES 1");
        this.assertUpdate("UPDATE " + tableName + " SET new_x = 2", 1L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES 2");
        this.assertUpdate("MERGE INTO " + tableName + " USING (VALUES 42) t(dummy) ON false  WHEN NOT MATCHED THEN INSERT VALUES (3)", 1L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES 2, 3");
        this.assertUpdate("DELETE FROM " + tableName + " WHERE new_x = 2", 1L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES 3");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testRequiresQueryPartitionFilterWithUppercaseColumnName() throws Exception {
        String tableName = "test_require_partition_filter_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/case_sensitive").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (1, 11), (2, 22)", 2L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (2, 22)");
        Session session = Session.builder((Session)this.getSession()).setCatalogSessionProperty((String)this.getSession().getCatalog().orElseThrow(), "query_partition_filter_required", "true").build();
        this.assertQuery(session, String.format("SELECT * FROM %s WHERE \"part\" = 11", tableName), "VALUES (1, 11)");
        this.assertQuery(session, String.format("SELECT * FROM %s WHERE \"PART\" = 11", tableName), "VALUES (1, 11)");
        this.assertQuery(session, String.format("SELECT * FROM %s WHERE \"Part\" = 11", tableName), "VALUES (1, 11)");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testAppendOnly() throws Exception {
        String tableName = "test_append_only_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/append_only").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (2, 12)");
        this.assertQueryFails("UPDATE " + tableName + " SET a = a + 1", "Cannot modify rows from a table with 'delta.appendOnly' set to true");
        this.assertQueryFails("DELETE FROM " + tableName + " WHERE a = 1", "Cannot modify rows from a table with 'delta.appendOnly' set to true");
        this.assertQueryFails("DELETE FROM " + tableName, "Cannot modify rows from a table with 'delta.appendOnly' set to true");
        this.assertQueryFails("TRUNCATE TABLE " + tableName, "Cannot modify rows from a table with 'delta.appendOnly' set to true");
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (2, 12)");
        this.assertUpdate("COMMENT ON COLUMN " + tableName + ".a IS 'test column comment'");
        this.assertUpdate("COMMENT ON TABLE " + tableName + " IS 'test table comment'");
        this.assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN new_col INT");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM \"" + tableName + "$properties\""))).result().rows().contains((Object[])new MaterializedRow[]{new MaterializedRow(List.of("delta.appendOnly", "true"))});
    }

    @Test
    public void testCreateOrReplaceTableOnAppendOnlyTableFails() throws Exception {
        String tableName = "test_append_only_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/append_only").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (2, 12)");
        this.assertQueryFails("CREATE OR REPLACE TABLE " + tableName + "(a INT, c INT)", "Cannot replace a table when 'delta.appendOnly' is set to true");
        this.assertQueryFails("CREATE OR REPLACE TABLE " + tableName + " AS SELECT 1 as e", "Cannot replace a table when 'delta.appendOnly' is set to true");
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (2, 12)");
    }

    @Test
    public void testStatisticsWithColumnCaseSensitivity() throws Exception {
        String tableName = "test_column_case_sensitivity_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/case_sensitive").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (10, 1), (20, 1), (null, 1)", 3L);
        List<DeltaLakeTransactionLogEntry> transactionLog = TestDeltaLakeBasic.getEntriesFromJson(1L, tableLocation.resolve("_delta_log").toString());
        Assertions.assertThat(transactionLog).hasSize(2);
        AddFileEntry addFileEntry = transactionLog.get(1).getAdd();
        DeltaLakeFileStatistics stats = (DeltaLakeFileStatistics)addFileEntry.getStats().orElseThrow();
        Assertions.assertThat((Map)((Map)stats.getMinValues().orElseThrow())).containsEntry((Object)"UPPER_CASE", (Object)10);
        Assertions.assertThat((Map)((Map)stats.getMaxValues().orElseThrow())).containsEntry((Object)"UPPER_CASE", (Object)20);
        Assertions.assertThat((Long)((Long)stats.getNullCount("UPPER_CASE").orElseThrow())).isEqualTo(1L);
        this.assertUpdate("UPDATE " + tableName + " SET upper_case = upper_case + 10", 3L);
        List<DeltaLakeTransactionLogEntry> transactionLogAfterUpdate = TestDeltaLakeBasic.getEntriesFromJson(2L, tableLocation.resolve("_delta_log").toString());
        Assertions.assertThat(transactionLogAfterUpdate).hasSize(3);
        AddFileEntry updateAddFileEntry = transactionLogAfterUpdate.get(2).getAdd();
        DeltaLakeFileStatistics updateStats = (DeltaLakeFileStatistics)updateAddFileEntry.getStats().orElseThrow();
        Assertions.assertThat((Map)((Map)updateStats.getMinValues().orElseThrow())).containsEntry((Object)"UPPER_CASE", (Object)20);
        Assertions.assertThat((Map)((Map)updateStats.getMaxValues().orElseThrow())).containsEntry((Object)"UPPER_CASE", (Object)30);
        Assertions.assertThat((Long)((Long)updateStats.getNullCount("UPPER_CASE").orElseThrow())).isEqualTo(1L);
        this.assertQuery("SHOW STATS FOR " + tableName, "VALUES\n('upper_case', null, 2.0, 0.3333333333333333, null, 20, 30),\n('part', null, 1.0, 0.0, null, null, null),\n(null, null, null, null, 3.0, null, null)\n");
        this.assertUpdate(String.format("ANALYZE %s WITH(mode = 'full_refresh')", tableName), 3L);
        this.assertQuery("SHOW STATS FOR " + tableName, "VALUES\n('upper_case', null, 2.0, 0.3333333333333333, null, 20, 30),\n('part', null, 1.0, 0.0, null, null, null),\n(null, null, null, null, 3.0, null, null)\n");
    }

    @Test
    public void testDeltaTimestampNtz() throws Exception {
        this.testDeltaTimestampNtz(ZoneOffset.UTC);
        this.testDeltaTimestampNtz(this.jvmZone);
        this.testDeltaTimestampNtz(this.vilnius);
        this.testDeltaTimestampNtz(this.kathmandu);
        this.testDeltaTimestampNtz(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
    }

    private void testDeltaTimestampNtz(ZoneId sessionZone) throws Exception {
        String tableName = "timestamp_ntz" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks131/timestamp_ntz").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        Session session = Session.builder((Session)this.getSession()).setTimeZoneKey(TimeZoneKey.getTimeZoneKey((String)sessionZone.getId())).build();
        this.assertQuery("DESCRIBE " + tableName, "VALUES ('x', 'timestamp(6)', '', '')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT * FROM " + tableName))).matches("VALUES\nNULL,\nTIMESTAMP '-9999-12-31 23:59:59.999999',\nTIMESTAMP '-0001-01-01 00:00:00',\nTIMESTAMP '0000-01-01 00:00:00',\nTIMESTAMP '1582-10-05 00:00:00',\nTIMESTAMP '1582-10-14 23:59:59.999999',\nTIMESTAMP '2020-12-31 01:02:03.123456',\nTIMESTAMP '9999-12-31 23:59:59.999999'\n");
        this.assertQuery("SHOW STATS FOR " + tableName, "VALUES\n('x', null, null, 0.125, null, null, null),\n(null, null, null, null, 8.0, null, null)\n");
        this.assertUpdate(session, "INSERT INTO " + tableName + " VALUES TIMESTAMP '2023-01-02 03:04:05.123456'", 1L);
        this.assertQuery(session, "SELECT true FROM " + tableName + " WHERE x = TIMESTAMP '2023-01-02 03:04:05.123456'", "VALUES true");
        this.assertQuery("SHOW STATS FOR " + tableName, "VALUES\n('x', null, 1.0, 0.1111111111111111, null, '2023-01-02 03:04:05.123000', '2023-01-02 03:04:05.124000'),\n(null, null, null, null, 9.0, null, null)\n");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testTrinoCreateTableWithTimestampNtz() throws Exception {
        this.testTrinoCreateTableWithTimestampNtz(ZoneOffset.UTC);
        this.testTrinoCreateTableWithTimestampNtz(this.jvmZone);
        this.testTrinoCreateTableWithTimestampNtz(this.vilnius);
        this.testTrinoCreateTableWithTimestampNtz(this.kathmandu);
        this.testTrinoCreateTableWithTimestampNtz(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
    }

    private void testTrinoCreateTableWithTimestampNtz(ZoneId sessionZone) throws Exception {
        this.testTrinoCreateTableWithTimestampNtz(sessionZone, tableName -> {
            this.assertUpdate("CREATE TABLE " + tableName + "(x timestamp(6))");
            this.assertUpdate("INSERT INTO " + tableName + " VALUES timestamp '2023-01-02 03:04:05.123456'", 1L);
        });
    }

    @Test
    public void testTrinoCreateTableAsSelectWithTimestampNtz() throws Exception {
        this.testTrinoCreateTableAsSelectWithTimestampNtz(ZoneOffset.UTC);
        this.testTrinoCreateTableAsSelectWithTimestampNtz(this.jvmZone);
        this.testTrinoCreateTableAsSelectWithTimestampNtz(this.vilnius);
        this.testTrinoCreateTableAsSelectWithTimestampNtz(this.kathmandu);
        this.testTrinoCreateTableAsSelectWithTimestampNtz(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
    }

    private void testTrinoCreateTableAsSelectWithTimestampNtz(ZoneId sessionZone) throws Exception {
        this.testTrinoCreateTableWithTimestampNtz(sessionZone, tableName -> this.assertUpdate("CREATE TABLE " + tableName + " AS SELECT timestamp '2023-01-02 03:04:05.123456' AS x", 1L));
    }

    private void testTrinoCreateTableWithTimestampNtz(ZoneId sessionZone, Consumer<String> createTable) throws IOException {
        String tableName = "test_create_table_timestamp_ntz" + TestingNames.randomNameSuffix();
        Session session = Session.builder((Session)this.getSession()).setTimeZoneKey(TimeZoneKey.getTimeZoneKey((String)sessionZone.getId())).build();
        createTable.accept(tableName);
        this.assertQuery(session, "SELECT * FROM " + tableName, "VALUES TIMESTAMP '2023-01-02 03:04:05.123456'");
        String tableLocation = this.getTableLocation(tableName);
        List<DeltaLakeTransactionLogEntry> transactionLogs = TestDeltaLakeBasic.getEntriesFromJson(0L, tableLocation + "/_delta_log");
        ProtocolEntry protocolEntry = transactionLogs.get(1).getProtocol();
        Assertions.assertThat((Object)protocolEntry).isNotNull();
        Assertions.assertThat((int)protocolEntry.minReaderVersion()).isEqualTo(3);
        Assertions.assertThat((int)protocolEntry.minWriterVersion()).isEqualTo(7);
        Assertions.assertThat((Optional)protocolEntry.readerFeatures()).hasValue((Object)ImmutableSet.of((Object)"timestampNtz"));
        Assertions.assertThat((Optional)protocolEntry.writerFeatures()).hasValue((Object)ImmutableSet.of((Object)"timestampNtz"));
        this.assertUpdate(session, "INSERT INTO " + tableName + " VALUES\nNULL,\nTIMESTAMP '-9999-12-31 23:59:59.999999',\nTIMESTAMP '-0001-01-01 00:00:00',\nTIMESTAMP '0000-01-01 00:00:00',\nTIMESTAMP '1582-10-05 00:00:00',\nTIMESTAMP '1582-10-14 23:59:59.999999',\nTIMESTAMP '2020-12-31 01:02:03.123456',\nTIMESTAMP '9999-12-31 23:59:59.999999'\n", 8L);
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT * FROM " + tableName))).matches("VALUES\nNULL,\nTIMESTAMP '-9999-12-31 23:59:59.999999',\nTIMESTAMP '-0001-01-01 00:00:00',\nTIMESTAMP '0000-01-01 00:00:00',\nTIMESTAMP '1582-10-05 00:00:00',\nTIMESTAMP '1582-10-14 23:59:59.999999',\nTIMESTAMP '2020-12-31 01:02:03.123456',\nTIMESTAMP '2023-01-02 03:04:05.123456',\nTIMESTAMP '9999-12-31 23:59:59.999999'\n");
        this.assertQuery("SHOW STATS FOR " + tableName, "VALUES\n('x', null, 8.0, 0.1111111111111111, null, '2023-01-02 03:04:05.123000', '+10000-01-01 00:00:00.000000'),\n(null, null, null, null, 9.0, null, null)\n");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testTrinoTimestampNtzComplexType() {
        this.testTrinoTimestampNtzComplexType(ZoneOffset.UTC);
        this.testTrinoTimestampNtzComplexType(this.jvmZone);
        this.testTrinoTimestampNtzComplexType(this.vilnius);
        this.testTrinoTimestampNtzComplexType(this.kathmandu);
        this.testTrinoTimestampNtzComplexType(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
    }

    private void testTrinoTimestampNtzComplexType(ZoneId sessionZone) {
        String tableName = "test_timestamp_ntz_complex_type" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + "(id int, array_col array(timestamp(6)), map_col map(timestamp(6), timestamp(6)), row_col row(child timestamp(6)))");
        Session session = Session.builder((Session)this.getSession()).setTimeZoneKey(TimeZoneKey.getTimeZoneKey((String)sessionZone.getId())).build();
        this.assertUpdate(session, "INSERT INTO " + tableName + " VALUES (\n 1,\n ARRAY[TIMESTAMP '2020-12-31 01:02:03.123456'],\n MAP(ARRAY[TIMESTAMP '2021-12-31 01:02:03.123456'], ARRAY[TIMESTAMP '2022-12-31 01:02:03.123456']),\n ROW(TIMESTAMP '2023-12-31 01:02:03.123456')\n)\n", 1L);
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT * FROM " + tableName))).matches("VALUES (\n 1,\n ARRAY[TIMESTAMP '2020-12-31 01:02:03.123456'],\n MAP(ARRAY[TIMESTAMP '2021-12-31 01:02:03.123456'], ARRAY[TIMESTAMP '2022-12-31 01:02:03.123456']),\n CAST(ROW(TIMESTAMP '2023-12-31 01:02:03.123456') AS ROW(child timestamp(6)))\n)\n");
        this.assertQuery("SHOW STATS FOR " + tableName, "VALUES\n('id', null, 1.0, 0.0, null, 1, 1),\n('array_col', null, null, null, null, null, null),\n('map_col', null, null, null, null, null, null),\n('row_col', null, null, null, null, null, null),\n(null, null, null, null, 1.0, null, null)\n");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testTimestampNtzPartitioned() throws Exception {
        this.testTimestampNtzPartitioned(ZoneOffset.UTC);
        this.testTimestampNtzPartitioned(this.jvmZone);
        this.testTimestampNtzPartitioned(this.vilnius);
        this.testTimestampNtzPartitioned(this.kathmandu);
        this.testTimestampNtzPartitioned(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
    }

    private void testTimestampNtzPartitioned(ZoneId sessionZone) throws Exception {
        String tableName = "timestamp_ntz_partition" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks131/timestamp_ntz_partition").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        Session session = Session.builder((Session)this.getSession()).setTimeZoneKey(TimeZoneKey.getTimeZoneKey((String)sessionZone.getId())).build();
        this.assertQuery("DESCRIBE " + tableName, "VALUES ('id', 'integer', '', ''), ('part', 'timestamp(6)', '', '')");
        Assertions.assertThat((String)((String)this.computeScalar("SHOW CREATE TABLE " + tableName))).contains(new CharSequence[]{"partitioned_by = ARRAY['part']"});
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT * FROM " + tableName))).matches("    VALUES\n    (1, NULL),\n    (2, TIMESTAMP '-9999-12-31 23:59:59.999999'),\n    (3, TIMESTAMP '-0001-01-01 00:00:00'),\n    (4, TIMESTAMP '0000-01-01 00:00:00'),\n    (5, TIMESTAMP '1582-10-05 00:00:00'),\n    (6, TIMESTAMP '1582-10-14 23:59:59.999999'),\n    (7, TIMESTAMP '2020-12-31 01:02:03.123456'),\n    (8, TIMESTAMP '9999-12-31 23:59:59.999999')\n");
        this.assertQuery(session, "SELECT id FROM " + tableName + " WHERE part = TIMESTAMP '2020-12-31 01:02:03.123456'", "VALUES 7");
        this.assertQuery("SHOW STATS FOR " + tableName, "VALUES\n('id', null, null, 0.0, null, 1, 8),\n('part', null, 7.0, 0.125, null, null, null),\n(null, null, null, null, 8.0, null, null)\n");
        this.assertUpdate(session, "INSERT INTO " + tableName + " VALUES (9, TIMESTAMP '2023-01-02 03:04:05.123456')", 1L);
        this.assertQuery(session, "SELECT part FROM " + tableName + " WHERE id = 9", "VALUES TIMESTAMP '2023-01-02 03:04:05.123456'");
        this.assertQuery(session, "SELECT id FROM " + tableName + " WHERE part = TIMESTAMP '2023-01-02 03:04:05.123456'", "VALUES 9");
        this.assertQuery("SHOW STATS FOR " + tableName, "VALUES\n('id', null, 1.0, 0.0, null, 1, 9),\n('part', null, 8.0, 0.1111111111111111, null, null, null),\n(null, null, null, null, 9.0, null, null)\n");
        List<DeltaLakeTransactionLogEntry> transactionLogs = TestDeltaLakeBasic.getEntriesFromJson(2L, tableLocation.resolve("_delta_log").toString());
        Assertions.assertThat(transactionLogs).hasSize(2);
        AddFileEntry addFileEntry = transactionLogs.get(1).getAdd();
        Assertions.assertThat((Object)addFileEntry).isNotNull();
        Assertions.assertThat((String)addFileEntry.getPath()).startsWith((CharSequence)"part=2023-01-02%2003%253A04%253A05.123456/");
        Assertions.assertThat((Map)addFileEntry.getPartitionValues()).containsExactly(new Map.Entry[]{Map.entry("part", "2023-01-02 03:04:05.123456")});
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testAddTimestampNtzColumn() throws Exception {
        String tableName = "test_add_timestamp_ntz_column" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + "(id INT)");
        this.assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN ts timestamp(6)");
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (1, TIMESTAMP '2023-01-02 03:04:05.123456')", 1L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, TIMESTAMP '2023-01-02 03:04:05.123456')");
        String tableLocation = this.getTableLocation(tableName);
        List<DeltaLakeTransactionLogEntry> transactionLogsByCreateTable = TestDeltaLakeBasic.getEntriesFromJson(0L, tableLocation + "/_delta_log");
        ProtocolEntry protocolEntryByCreateTable = transactionLogsByCreateTable.get(1).getProtocol();
        Assertions.assertThat((Object)protocolEntryByCreateTable).isNotNull();
        Assertions.assertThat((int)protocolEntryByCreateTable.minReaderVersion()).isEqualTo(1);
        Assertions.assertThat((int)protocolEntryByCreateTable.minWriterVersion()).isEqualTo(2);
        Assertions.assertThat((Optional)protocolEntryByCreateTable.readerFeatures()).isEmpty();
        Assertions.assertThat((Optional)protocolEntryByCreateTable.writerFeatures()).isEmpty();
        List<DeltaLakeTransactionLogEntry> transactionLogsByAddColumn = TestDeltaLakeBasic.getEntriesFromJson(1L, tableLocation + "/_delta_log");
        ProtocolEntry protocolEntryByAddColumn = transactionLogsByAddColumn.get(1).getProtocol();
        Assertions.assertThat((Object)protocolEntryByAddColumn).isNotNull();
        Assertions.assertThat((int)protocolEntryByAddColumn.minReaderVersion()).isEqualTo(3);
        Assertions.assertThat((int)protocolEntryByAddColumn.minWriterVersion()).isEqualTo(7);
        Assertions.assertThat((Optional)protocolEntryByAddColumn.readerFeatures()).hasValue((Object)ImmutableSet.of((Object)"timestampNtz"));
        Assertions.assertThat((Optional)protocolEntryByAddColumn.writerFeatures()).hasValue((Object)ImmutableSet.of((Object)"timestampNtz"));
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testIdentityColumns() throws Exception {
        String tableName = "test_identity_columns_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks122/identity_columns").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        List<DeltaLakeTransactionLogEntry> transactionLog = TestDeltaLakeBasic.getEntriesFromJson(0L, tableLocation.resolve("_delta_log").toString());
        Assertions.assertThat(transactionLog).hasSize(3);
        MetadataEntry metadataEntry = transactionLog.get(2).getMetaData();
        Assertions.assertThat((Map)((Map)DeltaLakeSchemaSupport.getColumnsMetadata((MetadataEntry)metadataEntry).get("b"))).containsExactly(new Map.Entry[]{Assertions.entry((Object)"delta.identity.start", (Object)1), Assertions.entry((Object)"delta.identity.step", (Object)1), Assertions.entry((Object)"delta.identity.allowExplicitInsert", (Object)false)});
        this.assertUpdate("COMMENT ON COLUMN " + tableName + ".b IS 'test column comment'");
        List<DeltaLakeTransactionLogEntry> transactionLogAfterComment = TestDeltaLakeBasic.getEntriesFromJson(1L, tableLocation.resolve("_delta_log").toString());
        Assertions.assertThat(transactionLogAfterComment).hasSize(3);
        MetadataEntry commentMetadataEntry = transactionLogAfterComment.get(2).getMetaData();
        Assertions.assertThat((Map)((Map)DeltaLakeSchemaSupport.getColumnsMetadata((MetadataEntry)commentMetadataEntry).get("b"))).containsExactly(new Map.Entry[]{Assertions.entry((Object)"comment", (Object)"test column comment"), Assertions.entry((Object)"delta.identity.start", (Object)1), Assertions.entry((Object)"delta.identity.step", (Object)1), Assertions.entry((Object)"delta.identity.allowExplicitInsert", (Object)false)});
    }

    @Test
    public void testWritesToTableWithIdentityColumnFails() throws Exception {
        String tableName = "test_identity_columns_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks122/identity_columns").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQueryFails("INSERT INTO " + tableName + " VALUES (4, 4)", "Writing to tables with identity columns is not supported");
        this.assertQueryFails("UPDATE " + tableName + " SET a = 3", "Writing to tables with identity columns is not supported");
        this.assertQueryFails("DELETE FROM " + tableName, "Writing to tables with identity columns is not supported");
        this.assertQueryFails("MERGE INTO " + tableName + " t USING " + tableName + " s ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET a = 1", "Writing to tables with identity columns is not supported");
        this.assertQueryFails("ALTER TABLE " + tableName + " EXECUTE optimize", "Writing to tables with identity columns is not supported");
    }

    @Test
    public void testIdentityColumnTableFeature() throws Exception {
        String tableName = "test_identity_columns_table_feature_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks133/identity_columns_table_feature").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQueryFails("INSERT INTO " + tableName + " VALUES (4, 4)", "\\QUnsupported writer features: [identityColumns]");
        this.assertQueryFails("UPDATE " + tableName + " SET a = 3", "\\QUnsupported writer features: [identityColumns]");
        this.assertQueryFails("DELETE FROM " + tableName, "\\QUnsupported writer features: [identityColumns]");
        this.assertQueryFails("MERGE INTO " + tableName + " t USING " + tableName + " s ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET a = 1", "\\QUnsupported writer features: [identityColumns]");
    }

    @Test
    public void testAllowColumnDefaults() {
        this.assertQuery("SELECT * FROM allow_column_defaults", "VALUES (1, 16)");
        this.assertQueryFails("INSERT INTO allow_column_defaults VALUES (2, 32)", "\\QUnsupported writer features: [allowColumnDefaults]");
        this.assertQueryFails("INSERT INTO allow_column_defaults (a) VALUES (2)", "\\QUnsupported writer features: [allowColumnDefaults]");
    }

    @Test
    public void testDeletionVectorsEnabledCreateTable() throws Exception {
        this.testDeletionVectorsEnabledCreateTable("(x int) WITH (deletion_vectors_enabled = true)");
        this.testDeletionVectorsEnabledCreateTable("WITH (deletion_vectors_enabled = true) AS SELECT 1 x");
    }

    private void testDeletionVectorsEnabledCreateTable(String tableDefinition) throws Exception {
        try (TestTable table = this.newTrinoTable("deletion_vectors", tableDefinition);){
            Assertions.assertThat((String)((String)this.computeScalar("SHOW CREATE TABLE " + table.getName()))).contains(new CharSequence[]{"deletion_vectors_enabled = true"});
            String tableLocation = this.getTableLocation(table.getName());
            List<DeltaLakeTransactionLogEntry> transactionLogs = TestDeltaLakeBasic.getEntriesFromJson(0L, tableLocation + "/_delta_log");
            Assertions.assertThat((Object)transactionLogs.get(1).getProtocol()).isEqualTo((Object)new ProtocolEntry(3, 7, Optional.of(Set.of("deletionVectors")), Optional.of(Set.of("deletionVectors"))));
            this.assertUpdate("INSERT INTO " + table.getName() + " VALUES 2, 3", 2L);
            this.assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 2", 1L);
            Assertions.assertThat((List)org.assertj.core.util.Files.fileNamesIn((String)new URI(tableLocation).getPath(), (boolean)false)).anyMatch(path -> path.matches(".*/deletion_vector_.*.bin"));
            this.assertQueryFails("ALTER TABLE " + table.getName() + " SET PROPERTIES deletion_vectors_enabled = false", "The following properties cannot be updated: deletion_vectors_enabled");
        }
    }

    @Test
    public void testDeletionVectorsDisabledCreateTable() throws Exception {
        this.testDeletionVectorsDisabledCreateTable("(x int) WITH (deletion_vectors_enabled = false)");
        this.testDeletionVectorsDisabledCreateTable("WITH (deletion_vectors_enabled = false) AS SELECT 1 x");
    }

    private void testDeletionVectorsDisabledCreateTable(String tableDefinition) throws Exception {
        try (TestTable table = this.newTrinoTable("deletion_vectors", tableDefinition);){
            Assertions.assertThat((String)((String)this.computeScalar("SHOW CREATE TABLE " + table.getName()))).doesNotContain(new CharSequence[]{"deletion_vectors_enabled"});
            String tableLocation = this.getTableLocation(table.getName());
            List<DeltaLakeTransactionLogEntry> transactionLogs = TestDeltaLakeBasic.getEntriesFromJson(0L, tableLocation + "/_delta_log");
            Assertions.assertThat((Object)transactionLogs.get(1).getProtocol()).isEqualTo((Object)new ProtocolEntry(1, 2, Optional.empty(), Optional.empty()));
            this.assertUpdate("INSERT INTO " + table.getName() + " VALUES 2, 3", 2L);
            this.assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 2", 1L);
            Assertions.assertThat((List)org.assertj.core.util.Files.fileNamesIn((String)new URI(tableLocation).getPath(), (boolean)false)).noneSatisfy(path -> Assertions.assertThat((String)path).matches((CharSequence)".*/deletion_vector_.*.bin"));
            this.assertQueryFails("ALTER TABLE " + table.getName() + " SET PROPERTIES deletion_vectors_enabled = true", "The following properties cannot be updated: deletion_vectors_enabled");
        }
    }

    @Test
    public void testDeletionVectors() throws Exception {
        String tableName = "deletion_vectors" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks122/deletion_vectors").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(this.getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
        Assertions.assertThat((String)((String)this.computeScalar("SHOW CREATE TABLE " + tableName))).contains(new CharSequence[]{"deletion_vectors_enabled = true"});
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11)");
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (3, 31), (3, 32)", 2L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (3, 31), (3, 32)");
        this.assertUpdate("DELETE FROM " + tableName + " WHERE a = 3 AND b = 31", 1L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (3, 32)");
        this.assertUpdate("UPDATE " + tableName + " SET a = -3 WHERE b = 32", 1L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (-3, 32)");
        this.assertUpdate("UPDATE " + tableName + " SET a = -3 WHERE b = 32", 1L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 11), (-3, 32)");
        this.assertUpdate("MERGE INTO " + tableName + " t USING (SELECT * FROM (VALUES 1)) AS s(a) ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = -11", 1L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, -11), (-3, 32)");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testDeletionVectorsAllRows() throws Exception {
        String tableName = "deletion_vectors" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks122/deletion_vectors").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(this.getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
        this.assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 1L);
        DeletionVectorEntry deletionVector = (DeletionVectorEntry)TestDeltaLakeBasic.getEntriesFromJson(2L, String.valueOf(tableLocation) + "/_delta_log").get(2).getAdd().getDeletionVector().orElseThrow();
        Assertions.assertThat((Object)((DeletionVectorEntry)TestDeltaLakeBasic.getEntriesFromJson(3L, String.valueOf(tableLocation) + "/_delta_log").get(1).getRemove().deletionVector().orElseThrow())).isEqualTo((Object)deletionVector);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (3, 31), (3, 32)", 2L);
        this.assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 2L);
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10), (2, 20)", 2L);
        this.assertUpdate("UPDATE " + tableName + " SET a = a + 10", 2L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (11, 10), (12, 20)");
        this.assertUpdate("MERGE INTO " + tableName + " t USING (SELECT * FROM (VALUES 11, 12)) AS s(a) ON (t.a = s.a) WHEN MATCHED AND t.a = 11 THEN UPDATE SET b = 100 WHEN MATCHED AND t.a = 12 THEN DELETE", 2L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (11, 100)");
        this.assertUpdate("TRUNCATE TABLE " + tableName);
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testDeletionVectorsLargeDelete() throws Exception {
        String tableName = "deletion_vectors" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks122/deletion_vectors_empty").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(this.getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
        this.assertUpdate("INSERT INTO " + tableName + " SELECT orderkey, custkey FROM tpch.tiny.orders", 15000L);
        this.assertUpdate("DELETE FROM " + tableName + " WHERE a != 1", 14999L);
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("SELECT CAST(orderkey AS integer), CAST(custkey AS integer) FROM tpch.tiny.orders WHERE orderkey = 1");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testDeletionVectorsCheckPoint() throws Exception {
        String tableName = "deletion_vectors" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks122/deletion_vectors_empty").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(this.getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
        for (int i = 0; i < 9; ++i) {
            this.assertUpdate("INSERT INTO " + tableName + " VALUES (" + i + ", " + i + ")", 1L);
        }
        Assertions.assertThat((Path)tableLocation.resolve("_delta_log/00000000000000000010.checkpoint.parquet")).doesNotExist();
        this.assertUpdate("DELETE FROM " + tableName + " WHERE a != 1", 8L);
        Assertions.assertThat((Path)tableLocation.resolve("_delta_log/00000000000000000010.checkpoint.parquet")).exists();
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 1)");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testDeletionVectorsRandomPrefix() throws Exception {
        String tableName = "deletion_vectors_random_prefix" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/deletion_vector_random_prefix").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(this.getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10), (2, 20), (3, 30)", 3L);
        this.assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1L);
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (2, 20), (3, 30)");
        Assertions.assertThat((List)org.assertj.core.util.Files.fileNamesIn((String)new URI(tableLocation.toString()).getPath(), (boolean)true)).anyMatch(path -> path.matches(String.valueOf(tableLocation) + "/[a-zA-Z0-9_]{3}/deletion_vector_[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.bin"));
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    void testDeletionVectorsRepeat() {
        try (TestTable table = this.newTrinoTable("test_dv", "(x int) WITH (deletion_vectors_enabled = true)", List.of("1", "2", "3"));){
            this.assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 1", 1L);
            this.assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 2", 1L);
            ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + table.getName()))).matches("VALUES 3");
        }
    }

    @Test
    void testDeletionVectorsPages() throws Exception {
        this.testDeletionVectorsPages(true);
        this.testDeletionVectorsPages(false);
    }

    private void testDeletionVectorsPages(boolean parquetUseColumnIndex) throws Exception {
        String tableName = "deletion_vectors_pages" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/deletion_vector_pages").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(this.getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
        Session session = Session.builder((Session)this.getSession()).setCatalogSessionProperty("delta", "parquet_use_column_index", Boolean.toString(parquetUseColumnIndex)).build();
        this.assertQueryReturnsEmptyResult(session, "SELECT * FROM " + tableName + " WHERE id = 20001");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT * FROM " + tableName + " WHERE id = 99999"))).matches("VALUES 99999");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT id, _change_type, _commit_version FROM TABLE(system.table_changes('tpch', '" + tableName + "')) WHERE id = 20001"))).matches("VALUES (20001, VARCHAR 'insert', BIGINT '1'), (20001, VARCHAR 'update_preimage', BIGINT '2')");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testUnsupportedVacuumDeletionVectors() throws Exception {
        String tableName = "deletion_vectors" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks122/deletion_vectors_empty").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(this.getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
        this.assertQueryFails("CALL delta.system.vacuum('tpch', '" + tableName + "', '7d')", "Cannot execute vacuum procedure with deletionVectors writer features");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testLiquidClustering() {
        this.assertQuery("SELECT * FROM liquid_clustering", "VALUES ('test 1', 2024, 1), ('test 2', 2024, 2)");
        this.assertQuery("SELECT data FROM liquid_clustering WHERE year = 2024 AND month = 1", "VALUES 'test 1'");
        this.assertQuery("SELECT data FROM liquid_clustering WHERE year = 2024 AND month = 2", "VALUES 'test 2'");
        this.assertQueryReturnsEmptyResult("SELECT * FROM liquid_clustering FOR VERSION AS OF 0");
        this.assertQuery("SELECT * FROM liquid_clustering FOR VERSION AS OF 1", "VALUES ('test 1', 2024, 1)");
        this.assertQuery("SELECT * FROM liquid_clustering FOR VERSION AS OF 2", "VALUES ('test 1', 2024, 1), ('test 2', 2024, 2)");
        this.assertQuery("SELECT * FROM liquid_clustering FOR VERSION AS OF 3", "VALUES ('test 1', 2024, 1), ('test 2', 2024, 2)");
        this.assertQueryFails("INSERT INTO liquid_clustering VALUES ('test 3', 2024, 3)", "Unsupported writer features: .*");
    }

    @Test
    public void testUniFormHudi() {
        this.assertQuery("SELECT * FROM uniform_hudi", "VALUES (123)");
        this.assertQueryFails("INSERT INTO uniform_hudi VALUES (456)", "\\QUnsupported universal formats: [hudi]");
        this.assertQueryFails("CALL system.vacuum(CURRENT_SCHEMA, 'uniform_hudi', '7d')", "\\QUnsupported universal formats: [hudi]");
    }

    @Test
    public void testUniFormIcebergV1() {
        this.assertQuery("SELECT * FROM uniform_iceberg_v1", "VALUES (1, 'test data')");
        this.assertQueryFails("INSERT INTO uniform_iceberg_v1 VALUES (2, 'new data')", "\\QUnsupported universal formats: [iceberg]");
    }

    @Test
    public void testUniFormIcebergV2() {
        this.assertQuery("SELECT * FROM uniform_iceberg_v2", "VALUES (1, 'test data')");
        this.assertQueryFails("INSERT INTO uniform_iceberg_v2 VALUES (2, 'new data')", "\\QUnsupported universal formats: [iceberg]");
    }

    @Test
    public void testVariant() {
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE variant"))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('col_int', 'integer'),('simple_variant', 'json'),('array_variant', 'array(json)'),('map_variant', 'map(varchar, json)'),('struct_variant', 'row(x json)'),('col_string', 'varchar')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT col_int, simple_variant, array_variant[1], map_variant['key1'], struct_variant.x, col_string FROM variant"))).skippingTypesCheck().matches("VALUES (1, JSON '{\"col\":1}', JSON '{\"array\":2}', JSON '{\"map\":3}', JSON '{\"struct\":4}', 'test data'),(2, JSON '{\"col\":null}', JSON '{\"array\":null}', JSON '{\"map\":null}', JSON '{\"struct\":null}', 'test null data'),(3, NULL, NULL, NULL, NULL, 'test null'),(4, JSON '1', JSON '2', JSON '3', JSON '4', 'test without fields')");
        this.assertQueryFails("INSERT INTO variant VALUES (2, null, null, null, null, 'new data')", "Unsupported writer features: .*");
    }

    @Test
    public void testVariantReadAfterOptimization() throws Exception {
        String tableName = "test_variant_read_after_optimization_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks154/variant_read_after_optimization").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE " + tableName))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('id', 'integer'),('col_variant', 'json')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT col_variant FROM " + tableName + " FOR VERSION AS OF 1"))).matches("VALUES\nJSON '[\"a\",\"b\",\"c\"]',\nJSON '[1,\"a\",true,null]',\nJSON '{\"a\":1,\"b\":2}',\nJSON '{\"a\":[1,2],\"b\":{\"x\":true}}',\nJSON '[{\"x\":1},{\"y\":\"z\"}]'\n");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT json_extract(col_variant, '$[1]') AS second_value FROM " + tableName + " FOR VERSION AS OF 1"))).skippingTypesCheck().matches("VALUES '\"b\"', '\"a\"', NULL, NULL, '{\"y\":\"z\"}'");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT json_extract(col_variant, '$.a') AS second_value FROM " + tableName + " FOR VERSION AS OF 1"))).skippingTypesCheck().matches("VALUES NULL, NULL, '1', '[1,2]', NULL");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT col_variant FROM " + tableName))).matches("VALUES\nJSON '[\"a\",\"b\",\"c\"]',\nJSON '[1,\"a\",true,null]',\nJSON '{\"a\":1,\"b\":2}',\nJSON '{\"a\":[1,2],\"b\":{\"x\":true}}',\nJSON '[{\"x\":1},{\"y\":\"z\"}]',\nJSON '{\"nested\":[{\"x\":1},2,null]}',\nJSON '{\"deep\":{\"deeper_a\":{\"value\":\"va\"}}}',\nJSON '{\"deep\":{\"deeper_a\":{\"value\":\"vaa\"},\"deeper_b\":{\"value\":\"vbb\"}}}',\nJSON '{\"deep\":{\"deeper_c\":{\"value\":\"vc\"}}}'\n");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT json_extract(col_variant, '$[1]') AS second_value FROM " + tableName))).skippingTypesCheck().matches("VALUES '\"b\"', '\"a\"', NULL, NULL, '{\"y\":\"z\"}', NULL, NULL, NULL, NULL");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT json_extract(col_variant, '$.a') as variant_a FROM " + tableName + " WHERE id <= 4"))).skippingTypesCheck().matches("VALUES NULL, NULL, '1', '[1,2]'");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT json_extract(col_variant, '$.deep.deeper_a') as variant_deeper_a FROM " + tableName + " WHERE id >= 7"))).skippingTypesCheck().matches("VALUES '{\"value\":\"va\"}', '{\"value\":\"vaa\"}', NULL");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT json_extract(col_variant, '$.deep.deeper_c') as variant_deeper_c FROM " + tableName + " WHERE id >= 7"))).skippingTypesCheck().matches("VALUES NULL, NULL, '{\"value\":\"vc\"}'");
    }

    @Test
    public void testVariantTypes() {
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT\n col_boolean,\n col_long,\n col_float,\n col_double,\n col_decimal,\n col_string,\n col_binary,\n col_date,\n col_timestamp,\n col_timestampntz,\n col_array,\n col_map,\n col_struct\nFROM variant_types"))).skippingTypesCheck().matches("VALUES\n('true',\n'1',\n'0.2',\n'0.3',\n'0.4',\n'\"test data\"',\n'\"ZWg/\"',\n'\"2021-02-03\"',\n'\"2001-08-21 19:02:03.321-06:00\"',\n'\"2021-01-02 12:34:56.123456\"',\n'[1]',\n'{\"key1\":1,\"key2\":2}',\n'{\"x\":1}')");
    }

    @Test
    public void testCorruptedManagedTableLocation() throws Exception {
        String tableName = "bad_person_" + TestingNames.randomNameSuffix();
        this.assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 id, 'person1' name", 1L);
        String tableLocation = (String)this.computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*$', '') FROM " + tableName);
        this.testCorruptedTableLocation(tableName, Path.of(URI.create(tableLocation)), true);
    }

    @Test
    public void testCorruptedExternalTableLocation() throws Exception {
        String tableName = "bad_person_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(Path.of(this.getResourceLocation("databricks73/person").toURI()), tableLocation);
        this.getQueryRunner().execute(String.format("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')", tableName, tableLocation));
        this.testCorruptedTableLocation(tableName, tableLocation, false);
    }

    private void testCorruptedTableLocation(String tableName, Path tableLocation, boolean isManaged) throws Exception {
        Path transactionLogDirectory = tableLocation.resolve("_delta_log");
        MoreFiles.deleteRecursively((Path)transactionLogDirectory, (RecursiveDeleteOption[])new RecursiveDeleteOption[]{RecursiveDeleteOption.ALLOW_INSECURE});
        this.assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "')");
        this.assertQueryFails("TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("SELECT * FROM \"" + tableName + "$history\"", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("SELECT * FROM \"" + tableName + "$properties\"", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("SELECT * FROM \"" + tableName + "$partitions\"", "Metadata not found in transaction log for tpch." + tableName + "\\$partitions");
        this.assertQueryFails("SELECT * FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("SELECT 1 FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("SHOW CREATE TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("CREATE TABLE a_new_table (LIKE " + tableName + " EXCLUDING PROPERTIES)", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("DESCRIBE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("SHOW COLUMNS FROM " + tableName, "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("SHOW STATS FOR " + tableName, "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("ANALYZE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("ALTER TABLE " + tableName + " EXECUTE optimize", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("ALTER TABLE " + tableName + " EXECUTE vacuum", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("ALTER TABLE " + tableName + " RENAME TO bad_person_some_new_name", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("ALTER TABLE " + tableName + " ADD COLUMN foo int", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN foo", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN foo.bar", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES change_data_feed_enabled = true", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("INSERT INTO " + tableName + " VALUES (NULL)", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("UPDATE " + tableName + " SET foo = 'bar'", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("DELETE FROM " + tableName, "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("MERGE INTO  " + tableName + " USING (SELECT 1 a) input ON true WHEN MATCHED THEN DELETE", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("TRUNCATE TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("COMMENT ON TABLE " + tableName + " IS NULL", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("COMMENT ON COLUMN " + tableName + ".foo IS NULL", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("CALL system.vacuum(CURRENT_SCHEMA, '" + tableName + "', '7d')", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("SELECT * FROM TABLE(system.table_changes(CURRENT_SCHEMA, '" + tableName + "'))", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQueryFails("CREATE OR REPLACE TABLE " + tableName + " (id INTEGER)", "Metadata not found in transaction log for tpch." + tableName);
        this.assertQuerySucceeds("CALL system.drop_extended_stats(CURRENT_SCHEMA, '" + tableName + "')");
        this.assertQuery("SHOW TABLES LIKE 'bad\\_person\\_%' ESCAPE '\\'", "VALUES '" + tableName + "'");
        this.assertQueryReturnsEmptyResult("SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA AND table_name LIKE 'bad\\_person\\_%' ESCAPE '\\'");
        this.assertQueryReturnsEmptyResult("SELECT column_name, data_type FROM system.jdbc.columns WHERE table_cat = CURRENT_CATALOG AND table_schem = CURRENT_SCHEMA AND table_name LIKE 'bad\\_person\\_%' ESCAPE '\\'");
        this.getQueryRunner().execute("DROP TABLE " + tableName);
        Assertions.assertThat((boolean)this.getQueryRunner().tableExists(this.getSession(), tableName)).isFalse();
        if (isManaged) {
            Assertions.assertThat((File)tableLocation.toFile()).doesNotExist().as("Table location should not exist", new Object[0]);
        } else {
            Assertions.assertThat((File)tableLocation.toFile()).exists().as("Table location should exist", new Object[0]);
        }
    }

    @Test
    public void testStatsWithMinMaxValuesAsNulls() {
        this.assertQuery("SELECT * FROM stats_with_minmax_nulls", "VALUES\n(0, 1),\n(1, 2),\n(3, 4),\n(3, 7),\n(NULL, NULL),\n(NULL, NULL)\n");
        this.assertQuery("SHOW STATS FOR stats_with_minmax_nulls", "VALUES\n('id', null, null, 0.3333333333333333, null, 0, 3),\n('id2', null, null, 0.3333333333333333, null, 1, 7),\n(null, null, null, null, 6.0, null, null)\n");
    }

    @Test
    public void testReadMultipartCheckpoint() throws Exception {
        String tableName = "test_multipart_checkpoint_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/multipart_checkpoint").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE " + tableName))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('c', 'integer')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES 1, 2, 3, 4, 5, 6, 7");
    }

    @Test
    public void testReadMultipartV2Checkpoint() throws Exception {
        String tableName = "test_multipart_v2_checkpoint_" + TestingNames.randomNameSuffix();
        Path tableLocation = Files.createTempFile(tableName, null, new FileAttribute[0]);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/multipart_v2_checkpoint").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(this.getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE " + tableName))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('a', 'integer'), ('b', 'integer')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (1,2), (3,4), (5,6), (7,8)");
    }

    @Test
    public void testTimeTravelWithMultipartCheckpoint() throws Exception {
        String tableName = "test_time_travel_multipart_checkpoint_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/multipart_checkpoint").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 5"))).matches("VALUES 1, 2, 3, 4, 5");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 6"))).matches("VALUES 1, 2, 3, 4, 5, 6");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 7"))).matches("VALUES 1, 2, 3, 4, 5, 6, 7");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-10-16 06:53:09.907 UTC'"))).matches("VALUES 1, 2, 3, 4, 5");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-10-16 06:53:14.248 UTC'"))).matches("VALUES 1, 2, 3, 4, 5, 6");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-10-16 06:53:26.526 UTC'"))).matches("VALUES 1, 2, 3, 4, 5, 6, 7");
        Files.delete(tableLocation.resolve("_delta_log/_last_checkpoint"));
        this.assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 5"))).matches("VALUES 1, 2, 3, 4, 5");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 6"))).matches("VALUES 1, 2, 3, 4, 5, 6");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 7"))).matches("VALUES 1, 2, 3, 4, 5, 6, 7");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-10-16 06:53:09.907 UTC'"))).matches("VALUES 1, 2, 3, 4, 5");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-10-16 06:53:14.248 UTC'"))).matches("VALUES 1, 2, 3, 4, 5, 6");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-10-16 06:53:26.526 UTC'"))).matches("VALUES 1, 2, 3, 4, 5, 6, 7");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testTimeTravelWithV2Checkpoint() throws Exception {
        this.testTimeTravelWithV2Checkpoint("deltalake/v2_checkpoint_json");
        this.testTimeTravelWithV2Checkpoint("deltalake/v2_checkpoint_parquet");
        this.testTimeTravelWithV2Checkpoint("databricks133/v2_checkpoint_json");
        this.testTimeTravelWithV2Checkpoint("databricks133/v2_checkpoint_parquet");
    }

    private void testTimeTravelWithV2Checkpoint(String resourceName) throws Exception {
        String tableName = "test_time_travel_v2_checkpoint_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)resourceName).toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName + " FOR VERSION AS OF 0");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1"))).matches("VALUES (1, 2)");
        Files.delete(tableLocation.resolve("_delta_log/_last_checkpoint"));
        this.assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "')");
        this.assertQueryReturnsEmptyResult("SELECT * FROM " + tableName + " FOR VERSION AS OF 0");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1"))).matches("VALUES (1, 2)");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testTimeTravelWithV2CheckpointUsingTemporal() throws Exception {
        this.testTimeTravelWithV2CheckpointWithTemporalVersion("deltalake/v2_checkpoint_json_using_temporal", "2025-01-30 13:14:58.530 UTC", "2025-01-30 13:15:05.942 UTC");
        this.testTimeTravelWithV2CheckpointWithTemporalVersion("deltalake/v2_checkpoint_parquet_using_temporal", "2025-01-31 02:35:49.788 UTC", "2025-01-31 02:36:28.433 UTC");
    }

    private void testTimeTravelWithV2CheckpointWithTemporalVersion(String resourceName, String v2TimestampWithZone, String v3TimestampWithZone) throws Exception {
        String tableName = "test_time_travel_v2_checkpoint_using_temporal_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)resourceName).toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '" + v2TimestampWithZone + "'"))).matches("VALUES (1, 2), (3, 4)");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '" + v3TimestampWithZone + "'"))).matches("VALUES (1, 2), (3, 4), (5, 6)");
        Files.delete(tableLocation.resolve("_delta_log/_last_checkpoint"));
        this.assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '" + v2TimestampWithZone + "'"))).matches("VALUES (1, 2), (3, 4)");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '" + v3TimestampWithZone + "'"))).matches("VALUES (1, 2), (3, 4), (5, 6)");
        this.assertUpdate("DROP TABLE " + tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTemporalTimeTravelUtilParallelSearch() throws Exception {
        String tableName = "test_time_travel_util_parallel_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/v2_checkpoint_json_using_temporal").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        try (ExecutorService executorService = Executors.newCachedThreadPool();){
            Assertions.assertThatThrownBy(() -> TemporalTimeTravelUtil.findLatestVersionUsingTemporal((TrinoFileSystem)FILE_SYSTEM, (String)tableLocation.toString(), (long)1738242898529L, (Executor)executorService, (int)1)).hasMessage("No temporal version history at or before %s".formatted(Instant.ofEpochMilli(1738242898529L)));
            Assertions.assertThat((long)TemporalTimeTravelUtil.findLatestVersionUsingTemporal((TrinoFileSystem)FILE_SYSTEM, (String)tableLocation.toString(), (long)1738242898530L, (Executor)executorService, (int)1)).isEqualTo(2L);
            Assertions.assertThat((long)TemporalTimeTravelUtil.findLatestVersionUsingTemporal((TrinoFileSystem)FILE_SYSTEM, (String)tableLocation.toString(), (long)1738242905942L, (Executor)executorService, (int)1)).isEqualTo(3L);
        }
        finally {
            this.assertUpdate("DROP TABLE " + tableName);
        }
    }

    @Test
    public void testDeltaLakeWithPartitionValuesParsed() throws Exception {
        this.testPartitionValuesParsed("deltalake/partition_values_parsed");
    }

    @Test
    public void testTrinoWithoutPartitionValuesParsed() throws Exception {
        this.testPartitionValuesParsed("trino432/partition_values_parsed");
    }

    private void testPartitionValuesParsed(String resourceName) throws Exception {
        String tableName = "test_partition_values_parsed_checkpoint_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)resourceName).toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        Session session = Session.builder((Session)this.getQueryRunner().getDefaultSession()).setCatalogSessionProperty("delta", "checkpoint_filtering_enabled", "false").build();
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT id FROM " + tableName + " WHERE int_part = 10 AND string_part = 'part1'"))).matches("VALUES 1");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT id FROM " + tableName + " WHERE int_part != 10"))).matches("VALUES 2");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT id FROM " + tableName + " WHERE int_part > 10"))).matches("VALUES 2");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT id FROM " + tableName + " WHERE int_part >= 10"))).matches("VALUES 1, 2");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT id FROM " + tableName + " WHERE int_part IN (10, 20)"))).matches("VALUES 1, 2");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT id FROM " + tableName + " WHERE int_part IS NULL AND string_part IS NULL"))).matches("VALUES 3");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT id FROM " + tableName + " WHERE int_part IS NOT NULL AND string_part IS NOT NULL"))).matches("VALUES 1, 2");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT id FROM " + tableName + " WHERE int_part = 10 AND string_part = 'unmatched partition condition'"))).returnsEmptyResult();
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT id FROM " + tableName + " WHERE int_part IS NULL AND string_part IS NOT NULL"))).returnsEmptyResult();
    }

    @Test
    public void testCheckpointFilteringForParsedStatsContainingNestedRows() throws Exception {
        String tableName = "test_parsed_stats_struct_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks133/parsed_stats_struct").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).skippingTypesCheck().matches("VALUES\n(100, 1, row(1, 'ala')),\n(200, 2, row(2, 'kota')),\n(300, 3, row(3, 'osla')),\n(400, 4, row(4, 'zulu'))\n");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT id FROM " + tableName + " WHERE part BETWEEN 100 AND 300"))).matches("VALUES 1, 2, 3");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT root.entry_two FROM " + tableName + " WHERE part BETWEEN 100 AND 300"))).skippingTypesCheck().matches("VALUES 'ala', 'kota', 'osla'");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SHOW STATS FOR (SELECT id FROM " + tableName + " WHERE part = 100)"))).skippingTypesCheck().matches("VALUES\n('id', NULL, NULL, DOUBLE '0.0' , NULL, '1', '1'),\n(NULL, NULL, NULL, NULL, DOUBLE '1.0', NULL, NULL)\n");
    }

    @Test
    public void testCheckpointFilteringForParsedStatsWithCaseSensitiveColumnNames() throws Exception {
        String tableName = "test_parsed_stats_case_sensitive_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks133/parsed_stats_case_sensitive").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).skippingTypesCheck().matches("VALUES\n(100, 1, 'ala'),\n(200, 2, 'kota'),\n(300, 3, 'osla'),\n(400, 4, 'zulu')\n");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT a_NuMbEr FROM " + tableName + " WHERE part BETWEEN 100 AND 300"))).matches("VALUES 1, 2, 3");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT a_StRiNg FROM " + tableName + " WHERE part BETWEEN 100 AND 300"))).skippingTypesCheck().matches("VALUES 'ala', 'kota', 'osla'");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SHOW STATS FOR (SELECT a_NuMbEr FROM " + tableName + " WHERE part BETWEEN 100 AND 300)"))).skippingTypesCheck().matches("VALUES\n('a_NuMbEr', NULL, NULL, DOUBLE '0.0' , NULL, '1', '3'),\n(NULL, NULL, NULL, NULL, DOUBLE '3.0', NULL, NULL)\n");
    }

    @Test
    public void testDeltaLakeWithPartitionValuesParsedAllTypes() throws Exception {
        String tableName = "test_partition_values_parsed_checkpoint_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/partition_values_parsed_all_types").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_boolean = true");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_tinyint = 1");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_smallint = 10");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_int = 100");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_bigint = 1000");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_short_decimal = CAST('123.12' AS DECIMAL(5,2))");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_long_decimal = CAST('123456789012345678.123' AS DECIMAL(21,3))");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_double = 1.2");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_float = 3.4");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_varchar = 'a'");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_date = DATE '2020-08-21'");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_timestamp = TIMESTAMP '2020-10-21 01:00:00.123 UTC'");
        this.assertPartitionValuesParsedCondition(tableName, 1, "part_timestamp_ntz = TIMESTAMP '2023-01-02 01:02:03.456'");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_boolean IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_tinyint IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_smallint IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_int IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_bigint IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_short_decimal IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_long_decimal IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_double IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_float IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_varchar IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_date IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_timestamp IS NULL");
        this.assertPartitionValuesParsedCondition(tableName, 3, "part_timestamp_ntz IS NULL");
    }

    @Test
    public void testDeltaLakeWritePartitionValuesParsedAllTypesInCheckpoint() throws Exception {
        String tableName = "test_write_partition_values_parsed_checkpoint_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/partition_values_parsed_all_types").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).skippingTypesCheck().matches("VALUES\n    (1, true, TINYINT '1', SMALLINT '10', 100, BIGINT '1000', CAST('123.12' AS DECIMAL(5,2)), CAST('123456789012345678.123' AS DECIMAL(21,3)), DOUBLE '1.2', REAL '3.4', 'a', DATE '2020-08-21', TIMESTAMP '2020-10-21 01:00:00.123 UTC', TIMESTAMP '2023-01-02 01:02:03.456'),\n    (2, false, TINYINT '2', SMALLINT '20', 200, BIGINT '2000', CAST('223.12' AS DECIMAL (5,2)), CAST('223456789012345678.123' AS DECIMAL(21,3)), DOUBLE '10.2', REAL '30.4', 'b', DATE '2020-08-22', TIMESTAMP '2020-10-22 01:00:00.123 UTC', TIMESTAMP '2023-01-03 01:02:03.456'),\n    (3, null, null, null, null, null, null, null, null, null, null, null, null, null)\n");
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (4, false, TINYINT '4', SMALLINT '40', 400, BIGINT '4000', CAST('444.44' AS DECIMAL(5,2)), CAST('4444444.444' AS DECIMAL(21,3)), DOUBLE '4.4', REAL '4.4', 'd', DATE '2020-08-24', TIMESTAMP '2020-10-24 01:00:00.123 UTC', TIMESTAMP '2023-01-04 01:02:03.456')", 1L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (5, false, TINYINT '5', SMALLINT '50', 500, BIGINT '5000', CAST('555.5' AS DECIMAL(5,2)), CAST('55555.55' AS DECIMAL(21,3)), DOUBLE '5.55', REAL '5.5555', 'd', DATE '2020-08-25', TIMESTAMP '2020-10-25 01:00:00.123 UTC', TIMESTAMP '2023-01-05 01:02:03.456')", 1L);
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (6, null, null, null, null, null, null, null, null, null, null, null, null, null)", 1L);
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).skippingTypesCheck().matches("VALUES\n    (1, true, TINYINT '1', SMALLINT '10', 100, BIGINT '1000', CAST('123.12' AS DECIMAL(5,2)), CAST('123456789012345678.123' AS DECIMAL(21,3)), DOUBLE '1.2', REAL '3.4', 'a', DATE '2020-08-21', TIMESTAMP '2020-10-21 01:00:00.123 UTC', TIMESTAMP '2023-01-02 01:02:03.456'),\n    (2, false, TINYINT '2', SMALLINT '20', 200, BIGINT '2000', CAST('223.12' AS DECIMAL (5,2)), CAST('223456789012345678.123' AS DECIMAL(21,3)), DOUBLE '10.2', REAL '30.4', 'b', DATE '2020-08-22', TIMESTAMP '2020-10-22 01:00:00.123 UTC', TIMESTAMP '2023-01-03 01:02:03.456'),\n    (3, null, null, null, null, null, null, null, null, null, null, null, null, null),\n    (4, false, TINYINT '4', SMALLINT '40', 400, BIGINT '4000', CAST('444.44' AS DECIMAL(5,2)), CAST('4444444.444' AS DECIMAL(21,3)), DOUBLE '4.4', REAL '4.4', 'd', DATE '2020-08-24', TIMESTAMP '2020-10-24 01:00:00.123 UTC', TIMESTAMP '2023-01-04 01:02:03.456'),\n    (5, false, TINYINT '5', SMALLINT '50', 500, BIGINT '5000', CAST('555.5' AS DECIMAL(5,2)), CAST('55555.55' AS DECIMAL(21,3)), DOUBLE '5.55', REAL '5.5555', 'd', DATE '2020-08-25', TIMESTAMP '2020-10-25 01:00:00.123 UTC', TIMESTAMP '2023-01-05 01:02:03.456'),\n    (6, null, null, null, null, null, null, null, null, null, null, null, null, null)\n");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT id\nFROM %s\nWHERE\n    part_boolean = true AND\n    part_tinyint = TINYINT '1' AND\n    part_smallint= SMALLINT '10' AND\n    part_int = 100 AND\n    part_bigint = BIGINT '1000' AND\n    part_short_decimal = CAST('123.12' AS DECIMAL(5,2)) AND\n    part_long_decimal = CAST('123456789012345678.123' AS DECIMAL(21,3)) AND\n    part_double = DOUBLE '1.2' AND\n    part_float = REAL '3.4' AND\n    part_varchar = 'a' AND\n    part_date = DATE '2020-08-21' AND\n    part_timestamp = TIMESTAMP '2020-10-21 01:00:00.123 UTC' AND\n    part_timestamp_ntz =TIMESTAMP '2023-01-02 01:02:03.456'".formatted(tableName)))).matches("VALUES 1");
    }

    @Test
    public void testDeltaLakeWritePartitionValuesParsedCaseSensitiveInCheckpoint() throws Exception {
        String tableName = "test_write_partition_values_parsed_checkpoint_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"databricks133/partition_values_parsed_case_sensitive").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        Session session = Session.builder((Session)this.getQueryRunner().getDefaultSession()).setCatalogSessionProperty("delta", "checkpoint_filtering_enabled", "false").build();
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT * FROM " + tableName))).skippingTypesCheck().matches("VALUES\n    (100, 1, 'ala'),\n    (200, 2,'kota'),\n    (300, 3, 'osla')\n");
        this.assertUpdate("INSERT INTO " + tableName + " VALUES (400, 4, 'kon')", 1L);
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query(session, "SELECT * FROM " + tableName))).skippingTypesCheck().matches("VALUES\n    (100, 1, 'ala'),\n    (200, 2,'kota'),\n    (300, 3, 'osla'),\n    (400, 4, 'kon')\n");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT id FROM " + tableName + " WHERE part_NuMbEr = 1 AND part_StRiNg = 'ala'"))).matches("VALUES 100");
    }

    private void assertPartitionValuesParsedCondition(String tableName, int id, @Language(value="SQL") String condition) {
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT id FROM " + tableName + " WHERE " + condition))).matches("VALUES " + id);
    }

    @Test
    public void testReadV2Checkpoint() throws Exception {
        this.testReadV2Checkpoint("deltalake/v2_checkpoint_json");
        this.testReadV2Checkpoint("deltalake/v2_checkpoint_parquet");
        this.testReadV2Checkpoint("databricks133/v2_checkpoint_json");
        this.testReadV2Checkpoint("databricks133/v2_checkpoint_parquet");
    }

    private void testReadV2Checkpoint(String resourceName) throws Exception {
        String tableName = "test_v2_checkpoint_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        Path source = new File(Resources.getResource((String)resourceName).toURI()).toPath();
        TestingDeltaLakeUtils.copyDirectoryContents(source, tableLocation);
        ((AbstractStringAssert)Assertions.assertThat((Path)source.resolve("_delta_log/_last_checkpoint")).content().contains(new CharSequence[]{"v2Checkpoint"})).contains(new CharSequence[]{"sidecar"});
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE " + tableName))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('a', 'integer'), ('b', 'integer')");
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 2)");
        this.assertQueryFails("INSERT INTO " + tableName + " VALUES (3, 4)", "\\QUnsupported writer features: [v2Checkpoint]");
        this.assertQueryFails("UPDATE " + tableName + " SET a = 10", "\\QUnsupported writer features: [v2Checkpoint]");
        this.assertQueryFails("DELETE FROM " + tableName, "\\QUnsupported writer features: [v2Checkpoint]");
        this.assertQueryFails("TRUNCATE TABLE " + tableName, "\\QUnsupported writer features: [v2Checkpoint]");
        this.assertQueryFails("MERGE INTO " + tableName + " USING (VALUES 42) t(dummy) ON false WHEN NOT MATCHED THEN INSERT VALUES (3, 4)", "\\QUnsupported writer features: [v2Checkpoint]");
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 2)");
    }

    @Test
    public void testTypeWidening() throws Exception {
        String tableName = "test_type_widening_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/type_widening").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE " + tableName))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('col', 'integer')");
        this.assertQuery("SELECT * FROM " + tableName, "VALUES 127, 32767, 2147483647");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 0"))).returnsEmptyResult();
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1"))).matches("VALUES tinyint '127'");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 2"))).matches("VALUES smallint '127'");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 3"))).matches("VALUES smallint '127', smallint '32767'");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 4"))).matches("VALUES integer '127', integer '32767'");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 5"))).matches("VALUES integer '127', integer '32767', integer '2147483647'");
    }

    @Test
    public void testTypeWideningNotSkippingUnsupportedPartitionColumns() {
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE type_widening_partition"))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('col', 'tinyint'), ('byte_to_short', 'smallint'), ('byte_to_int', 'integer'), ('byte_to_long', 'bigint'), ('byte_to_decimal', 'decimal(10,0)'), ('byte_to_double', 'double'), ('short_to_int', 'integer'), ('short_to_long', 'bigint'), ('short_to_decimal', 'decimal(10,0)'), ('short_to_double', 'double'), ('int_to_long', 'bigint'), ('int_to_decimal', 'decimal(10,0)'), ('int_to_double', 'double'), ('long_to_decimal', 'decimal(20,0)'), ('float_to_double', 'double'), ('decimal_to_decimal', 'decimal(12,2)'), ('date_to_timestamp', 'timestamp(6)')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM type_widening_partition FOR VERSION AS OF 0"))).returnsEmptyResult();
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM type_widening_partition FOR VERSION AS OF 1"))).matches("VALUES (tinyint '1', tinyint '1', tinyint '1', tinyint '1', tinyint '1', tinyint '1', smallint '1', smallint '1', smallint '1', smallint '1', integer '1', integer '1', integer '1', bigint '1', real '1', CAST('1' AS decimal(10,0)), DATE '2024-06-19')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM type_widening_partition FOR VERSION AS OF 18"))).matches("VALUES (tinyint '1', smallint '1', integer '1', bigint '1', CAST('1' AS decimal(10,0)), double '1', integer '1', bigint '1', CAST('1' AS decimal(10,0)), double '1', bigint '1', CAST('1' AS decimal(10,0)), double '1', CAST('1' AS decimal(20,0)), double '1', CAST('1' AS decimal(12,2)), CAST('2024-06-19' AS timestamp(6))), (tinyint '2', smallint '256', integer '35000', bigint '2147483650', CAST('9223372036' AS decimal(10,0)), double '9.323372040456E9', integer '35000', bigint '2147483650', CAST('9223372036' AS decimal(10,0)), double '9.323372040456E9', bigint '2147483650', CAST('9223372036' AS decimal(10,0)), double '9.323372040456E9', CAST('92233720368547758073' AS decimal(20,0)), double '3.403E38', CAST('9223372036.25' AS decimal(12,2)), TIMESTAMP '2021-07-01 08:43:28.123456')");
    }

    @Test
    public void testTypeWideningSkippingUnsupportedColumns() {
        this.assertQuery("SELECT * FROM type_widening FOR VERSION AS OF 1", "VALUES (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.0, 1, DATE '2024-06-19')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE type_widening"))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('byte_to_short', 'smallint'), ('byte_to_int', 'integer'), ('short_to_int', 'integer')");
        this.assertQuery("SELECT * FROM type_widening", "VALUES (1, 1, 1), (2, 2, 2)");
    }

    @Test
    public void testTypeWideningSkippingUnsupportedNested() {
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE type_widening_nested"))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().isEmpty();
        this.assertQueryFails("SELECT s FROM type_widening_nested", "(.*)Column 's' cannot be resolved");
        this.assertQueryFails("SELECT * FROM type_widening_nested", "(.*)SELECT \\* not allowed from relation that has no columns");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM type_widening_nested FOR VERSION AS OF 0"))).returnsEmptyResult();
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM type_widening_nested FOR VERSION AS OF 1"))).matches("VALUES (CAST(ROW(127) AS ROW (field tinyint)), CAST(ROW(127, ROW(15)) AS ROW (field tinyint, field2 ROW(inner_field tinyint))), MAP(ARRAY[tinyint '-128'], ARRAY[tinyint '127']), ARRAY[tinyint '127'])");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM type_widening_nested FOR VERSION AS OF 7"))).matches("VALUES (CAST(ROW(127) AS ROW (field smallint)), CAST(ROW(127, ROW(15)) AS ROW (field smallint, field2 ROW(inner_field smallint))), MAP(ARRAY[smallint '-128'], ARRAY[smallint '127']), ARRAY[smallint '127'])");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM type_widening_nested FOR VERSION AS OF 8"))).matches("VALUES (CAST(ROW(127) AS ROW (field smallint)), CAST(ROW(127, ROW(15)) AS ROW (field smallint, field2 ROW(inner_field smallint))), MAP(ARRAY[smallint '-128'], ARRAY[smallint '127']), ARRAY[smallint '127']),(CAST(ROW(32767) AS ROW (field smallint)), CAST(ROW(32767, ROW(32767)) AS ROW (field smallint, field2 ROW(inner_field smallint))), MAP(ARRAY[smallint '-32768'], ARRAY[smallint '32767']), ARRAY[smallint '32767'])");
        this.assertQueryFails("SELECT * FROM type_widening_nested FOR VERSION AS OF 13", "(.*)SELECT \\* not allowed from relation that has no columns");
    }

    @Test
    public void testTypeWideningNested() throws Exception {
        String tableName = "test_type_widening_nestd_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/type_widening_nested").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("DESCRIBE " + tableName))).result().projected(new String[]{"Column", "Type"}).skippingTypesCheck().matches("VALUES ('s', 'row(field integer)'), ('m', 'map(integer, integer)'), ('a', 'array(integer)')");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName))).matches("VALUES (CAST(ROW(127) AS ROW(field integer)), MAP(ARRAY[-128], ARRAY[127]), ARRAY[127]),(CAST(ROW(32767) AS ROW(field integer)), MAP(ARRAY[-32768], ARRAY[32767]), ARRAY[32767]),(CAST(ROW(2147483647) AS ROW(field integer)), MAP(ARRAY[-2147483648], ARRAY[2147483647]), ARRAY[2147483647])");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 0"))).returnsEmptyResult();
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1"))).matches("VALUES (CAST(ROW(127) AS ROW(field tinyint)), MAP(ARRAY[tinyint '-128'], ARRAY[tinyint '127']), ARRAY[tinyint '127'])");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 5"))).matches("VALUES (CAST(ROW(127) AS ROW(field smallint)), MAP(ARRAY[smallint '-128'], ARRAY[smallint '127']), ARRAY[smallint '127'])");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 6"))).matches("VALUES (CAST(ROW(127) AS ROW(field smallint)), MAP(ARRAY[smallint '-128'], ARRAY[smallint '127']), ARRAY[smallint '127']),(CAST(ROW(32767) AS ROW(field smallint)), MAP(ARRAY[smallint '-32768'], ARRAY[smallint '32767']), ARRAY[smallint '32767'])");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 10"))).matches("VALUES (CAST(ROW(127) AS ROW(field integer)), MAP(ARRAY[integer '-128'], ARRAY[integer '127']), ARRAY[integer '127']),(CAST(ROW(32767) AS ROW(field integer)), MAP(ARRAY[integer '-32768'], ARRAY[integer '32767']), ARRAY[integer '32767'])");
        ((QueryAssertions.QueryAssert)Assertions.assertThat((AssertProvider)this.query("SELECT * FROM " + tableName + " FOR VERSION AS OF 11"))).matches("VALUES (CAST(ROW(127) AS ROW(field integer)), MAP(ARRAY[-128], ARRAY[127]), ARRAY[127]),(CAST(ROW(32767) AS ROW(field integer)), MAP(ARRAY[-32768], ARRAY[32767]), ARRAY[32767]),(CAST(ROW(2147483647) AS ROW(field integer)), MAP(ARRAY[-2147483648], ARRAY[2147483647]), ARRAY[2147483647])");
    }

    @Test
    public void testTypeWideningUnsupported() throws Exception {
        String tableName = "test_type_widening_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/type_widening_unsupported").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQueryFails("SELECT * FROM " + tableName, "(.*)SELECT \\* not allowed from relation that has no columns");
        this.assertQueryFails("SELECT col FROM " + tableName, "(.*)Column 'col' cannot be resolved");
    }

    @Test
    public void testUnsupportedWriterFeature() {
        this.assertQueryReturnsEmptyResult("SELECT * FROM unsupported_writer_feature");
        this.assertQueryFails("ALTER TABLE unsupported_writer_feature ADD COLUMN new_col int", "\\QUnsupported writer features: [generatedColumns]");
        this.assertQueryFails("ALTER TABLE unsupported_writer_feature RENAME COLUMN a TO renamed", "\\QUnsupported writer features: [generatedColumns]");
        this.assertQueryFails("ALTER TABLE unsupported_writer_feature DROP COLUMN b", "\\QUnsupported writer features: [generatedColumns]");
        this.assertQueryFails("ALTER TABLE unsupported_writer_feature ALTER COLUMN b DROP NOT NULL", "\\QUnsupported writer features: [generatedColumns]");
        this.assertQueryFails("ALTER TABLE unsupported_writer_feature EXECUTE OPTIMIZE", "\\QUnsupported writer features: [generatedColumns]");
        this.assertQueryFails("ALTER TABLE unsupported_writer_feature ALTER COLUMN b SET DATA TYPE bigint", "This connector does not support setting column types");
        this.assertQueryFails("COMMENT ON TABLE unsupported_writer_feature IS 'test comment'", "\\QUnsupported writer features: [generatedColumns]");
        this.assertQueryFails("COMMENT ON COLUMN unsupported_writer_feature.a IS 'test column comment'", "\\QUnsupported writer features: [generatedColumns]");
        this.assertQueryFails("CALL delta.system.vacuum('tpch', 'unsupported_writer_feature', '7d')", "\\QCannot execute vacuum procedure with [generatedColumns] writer features");
    }

    @Test
    public void testUnsupportedWriterVersion() {
        this.assertQueryReturnsEmptyResult("SELECT * FROM unsupported_writer_version");
        this.assertQueryFails("ALTER TABLE unsupported_writer_version ADD COLUMN new_col int", "Table .* requires Delta Lake writer version 8 which is not supported");
        this.assertQueryFails("COMMENT ON TABLE unsupported_writer_version IS 'test comment'", "Table .* requires Delta Lake writer version 8 which is not supported");
        this.assertQueryFails("COMMENT ON COLUMN unsupported_writer_version.col IS 'test column comment'", "Table .* requires Delta Lake writer version 8 which is not supported");
        this.assertQueryFails("ALTER TABLE unsupported_writer_version EXECUTE OPTIMIZE", "Table .* requires Delta Lake writer version 8 which is not supported");
        this.assertQueryFails("CALL delta.system.vacuum('tpch', 'unsupported_writer_version', '7d')", "Cannot execute vacuum procedure with 8 writer version");
    }

    @Test
    public void testReadInCommitTimestampInHistoryTable() throws Exception {
        String tableName = "in_commit_timestamp_history_read_" + TestingNames.randomNameSuffix();
        Path tableLocation = this.catalogDir.resolve(tableName);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)"deltalake/in_commit_timestamp_history_read").toURI()).toPath(), tableLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
        this.assertQuery("SELECT * FROM " + tableName, "VALUES (1, 1), (5, 5)");
        this.assertQuery("SELECT date_diff('millisecond', TIMESTAMP '1970-01-01 00:00:00 UTC', timestamp) FROM \"%s$history\"".formatted(tableName), "VALUES 1739859668531L, 1739859684775L, 1739859743394L, 1739859755480L");
    }

    @Test
    public void testMergeOnClonedTable() throws Exception {
        this.testMergeOnClonedTable("deltalake/clone_merge/clone_merge_source", "deltalake/clone_merge/clone_merge_cloned", "clone_merge_source", "spark_catalog.tiny.clone_merge_source");
        this.testMergeOnClonedTable("deltalake/clone_merge/clone_merge_deletion_vector_source", "deltalake/clone_merge/clone_merge_deletion_vector_cloned", "clone_merge_deletion_vector_source", "spark_catalog.tiny.clone_merge_deletion_vector_source");
    }

    private void testMergeOnClonedTable(String sourceResourceName, String clonedResourceName, String sourceTableDir, String oldSchemaTableName) throws Exception {
        String sourceTable = sourceTableDir + TestingNames.randomNameSuffix();
        Path sourceLocation = this.catalogDir.resolve("clone_test_dir" + TestingNames.randomNameSuffix()).resolve(sourceTableDir);
        FILE_SYSTEM.createDirectory(Location.of((String)sourceLocation.toString()));
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)sourceResourceName).toURI()).toPath(), sourceLocation);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(sourceTable, sourceLocation.toUri()));
        String sourceTableValues = "VALUES\n(1, 'A', TIMESTAMP '2024-01-01'),\n(2, 'B', TIMESTAMP '2024-01-01'),\n(3, 'C', TIMESTAMP '2024-02-02'),\n(4, 'D', TIMESTAMP '2024-02-02')\n";
        this.assertQuery("SELECT * FROM " + sourceTable, sourceTableValues);
        String clonedTable = "test_clone_merge_cloned_" + TestingNames.randomNameSuffix();
        Path cloneTableLocation = sourceLocation.resolveSibling(clonedTable);
        TestingDeltaLakeUtils.copyDirectoryContents(new File(Resources.getResource((String)clonedResourceName).toURI()).toPath(), cloneTableLocation);
        String schema = (String)this.getSession().getSchema().orElseThrow();
        TestDeltaLakeBasic.updateClonedTableDeletionVectorPathPrefixAndSource(cloneTableLocation, "file://" + String.valueOf(sourceLocation), oldSchemaTableName, schema + "." + sourceTable);
        this.assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(clonedTable, cloneTableLocation.toUri()));
        this.assertQuery("SELECT * FROM " + clonedTable, sourceTableValues);
        String expectedValuesAfterUpdate = "        VALUES\n        (1, 'A', TIMESTAMP '2024-01-01'),\n        (2, 'updated', TIMESTAMP '2024-01-01'),\n        (3, 'C', TIMESTAMP '2024-02-02'),\n        (4, 'updated', TIMESTAMP '2024-02-02')\n";
        this.assertUpdate("UPDATE " + clonedTable + " SET v = 'updated' WHERE id IN (2, 4)", 2L);
        this.assertQuery("SELECT * FROM " + clonedTable, expectedValuesAfterUpdate);
        String mergeSql = "MERGE INTO %s t\nUSING (VALUES (1, 'yyy', TIMESTAMP '2025-01-01'), (2, 'merged', TIMESTAMP '2025-02-02'), (5, 'kkk', TIMESTAMP '2025-03-03')) AS s(id, v, part)\nON (t.id = s.id)\nWHEN MATCHED AND s.v = 'yyy' THEN DELETE\nWHEN MATCHED THEN UPDATE SET v = s.v\nWHEN NOT MATCHED THEN INSERT (id, v, part) VALUES(s.id, s.v, s.part)\n".formatted(clonedTable);
        String expectedValuesAfterMerge = "VALUES\n(2, 'merged', TIMESTAMP '2024-01-01'),\n(3, 'C', TIMESTAMP '2024-02-02'),\n(4, 'updated', TIMESTAMP '2024-02-02'),\n(5, 'kkk', TIMESTAMP '2025-03-03')\n";
        this.assertUpdate(mergeSql, 3L);
        this.assertQuery("SELECT * FROM " + clonedTable, expectedValuesAfterMerge);
        this.assertQuery("SELECT * FROM " + sourceTable, sourceTableValues);
    }

    public static void updateClonedTableDeletionVectorPathPrefixAndSource(Path location, String newPrefix, String oldSchemaTableName, String newSchemaTableName) throws IOException {
        String oldPrefix = "s3://test-bucket/tiny/clone_merge_deletion_vector_source";
        Pattern pattern = Pattern.compile("(?<=\"(pathOrInlineDv|path)\":\")" + Pattern.quote(oldPrefix));
        Pattern patternForSchemaTableName = Pattern.compile("(?<=\"source\":\")" + Pattern.quote(oldSchemaTableName));
        try (Stream<Path> stream = Files.walk(location, new FileVisitOption[0]);){
            stream.filter(file -> !Files.isDirectory(file, new LinkOption[0])).filter(file -> file.getFileName().toString().endsWith(".json")).forEach(file -> {
                try {
                    String content = Files.readString(file);
                    String newContent = pattern.matcher(content).replaceAll(newPrefix);
                    newContent = patternForSchemaTableName.matcher(newContent).replaceAll("spark_catalog." + newSchemaTableName);
                    if (!content.equals(newContent)) {
                        Files.write(file, newContent.getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING);
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation) throws IOException {
        DeltaLakeTransactionLogEntry transactionLog = (DeltaLakeTransactionLogEntry)TestDeltaLakeBasic.getEntriesFromJson(entryNumber, tableLocation.resolve("_delta_log").toString()).stream().filter(log -> log.getMetaData() != null).collect(MoreCollectors.onlyElement());
        return transactionLog.getMetaData();
    }

    private static ProtocolEntry loadProtocolEntry(long entryNumber, Path tableLocation) throws IOException {
        DeltaLakeTransactionLogEntry transactionLog = (DeltaLakeTransactionLogEntry)TestDeltaLakeBasic.getEntriesFromJson(entryNumber, tableLocation.resolve("_delta_log").toString()).stream().filter(log -> log.getProtocol() != null).collect(MoreCollectors.onlyElement());
        return transactionLog.getProtocol();
    }

    private String getTableLocation(String tableName) {
        Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", 32);
        Matcher m = locationPattern.matcher((String)this.computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue());
        if (m.find()) {
            String location = m.group(1);
            Verify.verify((!m.find() ? 1 : 0) != 0, (String)"Unexpected second match", (Object[])new Object[0]);
            return location;
        }
        throw new IllegalStateException("Location not found in SHOW CREATE TABLE result");
    }

    private static List<DeltaLakeTransactionLogEntry> getEntriesFromJson(long entryNumber, String transactionLogDir) throws IOException {
        return ((TransactionLogEntries)TransactionLogTail.getEntriesFromJson((long)entryNumber, (String)transactionLogDir, (TrinoFileSystem)FILE_SYSTEM, (DataSize)DeltaLakeConfig.DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE).orElseThrow()).getEntriesList(FILE_SYSTEM);
    }
}

