/*
 * Decompiled with CFR 0.152.
 */
package io.trino.tests.product.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import io.airlift.concurrent.MoreFutures;
import io.trino.jdbc.Row;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreClient;
import io.trino.tempto.AfterTestWithContext;
import io.trino.tempto.BeforeTestWithContext;
import io.trino.tempto.ProductTest;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.hadoop.hdfs.HdfsClient;
import io.trino.tempto.query.QueryExecutionException;
import io.trino.tempto.query.QueryExecutor;
import io.trino.tempto.query.QueryResult;
import io.trino.testing.DataProviders;
import io.trino.testing.TestingNames;
import io.trino.tests.product.hive.Engine;
import io.trino.tests.product.hive.TestHiveMetastoreClientFactory;
import io.trino.tests.product.iceberg.util.IcebergTestUtils;
import io.trino.tests.product.utils.QueryExecutors;
import java.math.BigDecimal;
import java.net.URI;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.thrift.TException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class TestIcebergSparkCompatibility
extends ProductTest {
    @Inject
    private HdfsClient hdfsClient;
    @Inject
    private TestHiveMetastoreClientFactory testHiveMetastoreClientFactory;
    private ThriftMetastoreClient metastoreClient;
    private static final String SPARK_CATALOG = "iceberg_test";
    private static final String TRINO_CATALOG = "iceberg";
    private static final String TEST_SCHEMA_NAME = "default";
    private static final List<String> SPECIAL_CHARACTER_VALUES = ImmutableList.of((Object)"with-hyphen", (Object)"with.dot", (Object)"with:colon", (Object)"with/slash", (Object)"with\\\\backslashes", (Object)"with\\backslash", (Object)"with=equal", (Object)"with?question", (Object)"with!exclamation", (Object)"with%percent", (Object)"with%%percents", (Object)"with$dollar", (Object[])new String[]{"with#hash", "with*star", "with=equals", "with\"quote", "with'apostrophe", "with space", " with space prefix", "with space suffix ", "with\u20aceuro", "with non-ascii \u0105\u0119\u0142\u00f3\u015b\u0107 \u0398 \u03a6 \u0394", "with\ud83d\udc68\u200d\ud83c\udfedcombining character", " \ud83d\udc68\u200d\ud83c\udfed", "\ud83d\udc68\u200d\ud83c\udfed "});
    private static final String TRINO_INSERTED_PARTITION_VALUES = Streams.mapWithIndex(SPECIAL_CHARACTER_VALUES.stream(), (value, index) -> String.format("(%d, '%s')", index, TestIcebergSparkCompatibility.escapeTrinoString(value))).collect(Collectors.joining(", "));
    private static final String SPARK_INSERTED_PARTITION_VALUES = Streams.mapWithIndex(SPECIAL_CHARACTER_VALUES.stream(), (value, index) -> String.format("(%d, '%s')", index, TestIcebergSparkCompatibility.escapeSparkString(value))).collect(Collectors.joining(", "));
    private static final List<QueryAssert.Row> EXPECTED_PARTITION_VALUES = (List)Streams.mapWithIndex(SPECIAL_CHARACTER_VALUES.stream(), (value, index) -> QueryAssert.Row.row((Object[])new Object[]{(int)index, value})).collect(ImmutableList.toImmutableList());

    @BeforeTestWithContext
    public void setup() throws TException {
        this.metastoreClient = this.testHiveMetastoreClientFactory.createMetastoreClient();
        QueryExecutors.onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS iceberg.default WITH (location = 'hdfs://hadoop-master:9000/user/hive/warehouse/default')", new QueryExecutor.QueryParam[0]);
    }

    @AfterTestWithContext
    public void tearDown() {
        this.metastoreClient.close();
        this.metastoreClient = null;
    }

    @BeforeTestWithContext
    public void setUp() {
        QueryExecutors.onTrino().executeQuery(String.format("CREATE SCHEMA IF NOT EXISTS %s.%s", TRINO_CATALOG, TEST_SCHEMA_NAME), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsWithSpecVersion")
    public void testTrinoReadingSparkData(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_trino_reading_primitive_types_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (  _string STRING, _bigint BIGINT, _integer INTEGER, _real REAL, _double DOUBLE, _short_decimal decimal(8,2), _long_decimal decimal(38,19), _boolean BOOLEAN, _timestamp TIMESTAMP, _date DATE, _binary BINARY) USING ICEBERG TBLPROPERTIES ('write.format.default'='%s', 'format-version' = %s)", new Object[]{sparkTableName, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM %s", TestIcebergSparkCompatibility.trinoTableName("\"" + baseTableName + "$snapshots\"")), new QueryExecutor.QueryParam[0])).hasNoRows();
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).hasNoRows();
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM %s WHERE _integer > 0", trinoTableName), new QueryExecutor.QueryParam[0])).hasNoRows();
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s VALUES ('a_string', 1000000000000000, 1000000000, 10000000.123, 100000000000.123, CAST('123456.78' AS decimal(8,2)), CAST('1234567890123456789.0123456789012345678' AS decimal(38,19)), true, TIMESTAMP '2020-06-28 14:16:00.456', DATE '1950-06-28', X'000102f0feff')", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{"a_string", 1000000000000000L, 1000000000, Float.valueOf(1.0E7f), 1.00000000000123E11, new BigDecimal("123456.78"), new BigDecimal("1234567890123456789.0123456789012345678"), true, Timestamp.valueOf("2020-06-28 14:16:00.456"), Date.valueOf("1950-06-28"), new byte[]{0, 1, 2, -16, -2, -1}});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT   _string, _bigint, _integer, _real, _double, _short_decimal, _long_decimal, _boolean, _timestamp, _date, _binary FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT   _string, _bigint, _integer, _real, _double, _short_decimal, _long_decimal, _boolean, CAST(_timestamp AS TIMESTAMP), _date, _binary FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="testSparkReadingTrinoDataDataProvider")
    public void testSparkReadingTrinoData(StorageFormat storageFormat, CreateMode createMode) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_spark_reading_primitive_types_" + storageFormat + "_" + createMode);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        String namedValues = "SELECT   VARCHAR 'a_string' _string , 1000000000000000 _bigint , 1000000000 _integer , REAL '10000000.123' _real , DOUBLE '100000000000.123' _double , DECIMAL '123456.78' _short_decimal , DECIMAL '1234567890123456789.0123456789012345678' _long_decimal , true _boolean , TIMESTAMP '2021-08-03 08:32:21.123456 Europe/Warsaw' _timestamptz , DATE '1950-06-28' _date , X'000102f0feff' _binary ";
        switch (createMode) {
            case CREATE_TABLE_AND_INSERT: {
                QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (  _string VARCHAR, _bigint BIGINT, _integer INTEGER, _real REAL, _double DOUBLE, _short_decimal decimal(8,2), _long_decimal decimal(38,19), _boolean BOOLEAN, _timestamptz timestamp(6) with time zone, _date DATE, _binary VARBINARY) WITH (format = '%s')", new Object[]{trinoTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
                QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s %s", trinoTableName, namedValues), new QueryExecutor.QueryParam[0]);
                break;
            }
            case CREATE_TABLE_AS_SELECT: {
                QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s AS %s", trinoTableName, namedValues), new QueryExecutor.QueryParam[0]);
                break;
            }
            case CREATE_TABLE_WITH_NO_DATA_AND_INSERT: {
                QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s AS %s WITH NO DATA", trinoTableName, namedValues), new QueryExecutor.QueryParam[0]);
                QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s %s", trinoTableName, namedValues), new QueryExecutor.QueryParam[0]);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unsupported create mode: " + createMode);
            }
        }
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{"a_string", 1000000000000000L, 1000000000, Float.valueOf(1.0E7f), 1.00000000000123E11, new BigDecimal("123456.78"), new BigDecimal("1234567890123456789.0123456789012345678"), true, "2021-08-03 06:32:21.123456 UTC", "1950-06-28", new byte[]{0, 1, 2, -16, -2, -1}});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT   _string, _bigint, _integer, _real, _double, _short_decimal, _long_decimal, _boolean, CAST(_timestamptz AS varchar), CAST(_date AS varchar), _binary FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT   _string, _bigint, _integer, _real, _double, _short_decimal, _long_decimal, _boolean, CAST(_timestamptz AS string) || ' UTC', CAST(_date AS string), _binary FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @DataProvider
    public Object[][] testSparkReadingTrinoDataDataProvider() {
        return (Object[][])Stream.of(TestIcebergSparkCompatibility.storageFormats()).map(array -> Iterables.getOnlyElement(Arrays.asList(array))).flatMap(storageFormat -> Stream.of({storageFormat, CreateMode.CREATE_TABLE_AND_INSERT}, {storageFormat, CreateMode.CREATE_TABLE_AS_SELECT}, {storageFormat, CreateMode.CREATE_TABLE_WITH_NO_DATA_AND_INSERT})).toArray(x$0 -> new Object[x$0][]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormats")
    public void testSparkReadTrinoUuid(StorageFormat storageFormat) {
        String tableName = TestIcebergSparkCompatibility.toLowerCase("test_spark_read_trino_uuid_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(tableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(tableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s AS SELECT UUID '406caec7-68b9-4778-81b2-a12ece70c8b1' u", trinoTableName), new QueryExecutor.QueryParam[0]);
        ((AbstractThrowableAssert)QueryAssert.assertQueryFailure(() -> QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).isInstanceOf(SQLException.class)).hasMessageMatching("org.apache.hive.service.cli.HiveSQLException: Error running query:.*\\Q java.lang.ClassCastException\\E(?s:.*)");
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="specVersions")
    public void testSparkCreatesTrinoDrops(int specVersion) {
        String baseTableName = "test_spark_creates_trino_drops";
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG TBLPROPERTIES('format-version' = %s)", TestIcebergSparkCompatibility.sparkTableName(baseTableName), specVersion), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + TestIcebergSparkCompatibility.trinoTableName(baseTableName), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testTrinoCreatesSparkDrops() {
        String baseTableName = "test_trino_creates_spark_drops";
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT)", TestIcebergSparkCompatibility.trinoTableName(baseTableName)), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + TestIcebergSparkCompatibility.sparkTableName(baseTableName), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormats")
    public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_spark_reads_trino_partitioned_table_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (_string VARCHAR, _varbinary VARBINARY, _bigint BIGINT) WITH (partitioning = ARRAY['_string', '_varbinary'], format = '%s')", new Object[]{trinoTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', X'0ff102f0feff', 1001), ('b', X'0ff102f0fefe', 1002), ('c', X'0ff102fdfeff', 1003)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.Row row1 = QueryAssert.Row.row((Object[])new Object[]{"b", new byte[]{15, -15, 2, -16, -2, -2}, 1002});
        String selectByString = "SELECT * FROM %s WHERE _string = 'b'";
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format(selectByString, trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row1});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format(selectByString, sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row1});
        QueryAssert.Row row2 = QueryAssert.Row.row((Object[])new Object[]{"a", new byte[]{15, -15, 2, -16, -2, -1}, 1001});
        String selectByVarbinary = "SELECT * FROM %s WHERE _varbinary = X'0ff102f0feff'";
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format(selectByVarbinary, trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row2});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format(selectByVarbinary, sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row2});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsWithSpecVersion")
    public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_trino_reads_spark_partitioned_table_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (_string STRING, _varbinary BINARY, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string, _varbinary) TBLPROPERTIES ('write.format.default'='%s', 'format-version' = %s)", new Object[]{sparkTableName, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s VALUES ('a', X'0ff102f0feff', 1001), ('b', X'0ff102f0fefe', 1002), ('c', X'0ff102fdfeff', 1003)", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.Row row1 = QueryAssert.Row.row((Object[])new Object[]{"a", new byte[]{15, -15, 2, -16, -2, -1}, 1001});
        String select = "SELECT * FROM %s WHERE _string = 'a'";
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format(select, sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row1});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format(select, trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row1});
        QueryAssert.Row row2 = QueryAssert.Row.row((Object[])new Object[]{"c", new byte[]{15, -15, 2, -3, -2, -1}, 1003});
        String selectByVarbinary = "SELECT * FROM %s WHERE _varbinary = X'0ff102fdfeff'";
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format(selectByVarbinary, trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row2});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format(selectByVarbinary, sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row2});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testTrinoPartitionedByRealWithNaN(StorageFormat storageFormat) {
        this.testTrinoPartitionedByNaN("REAL", storageFormat, Float.valueOf(Float.NaN));
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testTrinoPartitionedByDoubleWithNaN(StorageFormat storageFormat) {
        this.testTrinoPartitionedByNaN("DOUBLE", storageFormat, Double.NaN);
    }

    private void testTrinoPartitionedByNaN(String typeName, StorageFormat storageFormat, Object expectedValue) {
        String baseTableName = "test_trino_partitioned_by_nan_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + " WITH (format = '" + storageFormat + "', partitioning = ARRAY['col'])AS SELECT " + typeName + " 'NaN' AS col", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{expectedValue})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{expectedValue})});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testSparkPartitionedByRealWithNaN(StorageFormat storageFormat) {
        this.testSparkPartitionedByNaN("FLOAT", storageFormat, Float.valueOf(Float.NaN));
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testSparkPartitionedByDoubleWithNaN(StorageFormat storageFormat) {
        this.testSparkPartitionedByNaN("DOUBLE", storageFormat, Double.NaN);
    }

    private void testSparkPartitionedByNaN(String typeName, StorageFormat storageFormat, Object expectedValue) {
        String baseTableName = "test_spark_partitioned_by_nan_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + " PARTITIONED BY (col) TBLPROPERTIES ('write.format.default' = '" + storageFormat + "')AS SELECT CAST('NaN' AS " + typeName + ") AS col", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{expectedValue})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{expectedValue})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"})
    public void testPartitionedByNestedField() {
        String baseTableName = "test_trino_nested_field_partition_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (  id INT,  parent STRUCT<nested:STRING, nested_another:STRING>)  USING ICEBERG  PARTITIONED BY (parent.nested)  TBLPROPERTIES ('format-version'=2)", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (2, ROW('b'))", new QueryExecutor.QueryParam[0])).hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("UPDATE " + trinoTableName + " SET id = 2", new QueryExecutor.QueryParam[0])).hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("DELETE FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("MERGE INTO " + trinoTableName + " t USING " + trinoTableName + " s ON (t.id = s.id) WHEN MATCHED THEN UPDATE SET id = 2", new QueryExecutor.QueryParam[0])).hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE OPTIMIZE", new QueryExecutor.QueryParam[0])).hasMessageContaining("Partitioning by nested field is unsupported: parent.nested");
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " DROP COLUMN parent.nested", new QueryExecutor.QueryParam[0])).hasMessageContaining("Cannot drop partition field: parent.nested");
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsWithSpecVersion")
    public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_trino_reading_spark_composites_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (  doc_id string,\n  info MAP<STRING, INT>,\n  pets ARRAY<STRING>,\n  user_info STRUCT<name:STRING, surname:STRING, age:INT, gender:STRING>)  USING ICEBERG TBLPROPERTIES ('write.format.default'='%s', 'format-version' = %s)", new Object[]{sparkTableName, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO TABLE %s SELECT 'Doc213', map('age', 28, 'children', 3), array('Dog', 'Cat', 'Pig'), \nnamed_struct('name', 'Santa', 'surname', 'Claus','age', 1000,'gender', 'MALE')", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT doc_id, info['age'], pets[2], user_info.surname FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"Doc213", 28, "Cat", "Claus"})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormats")
    public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_spark_reading_trino_composites_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (  doc_id VARCHAR,\n  info MAP(VARCHAR, INTEGER),\n  pets ARRAY(VARCHAR),\n  user_info ROW(name VARCHAR, surname VARCHAR, age INTEGER, gender VARCHAR))   WITH (format = '%s')", new Object[]{trinoTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES('Doc213', MAP(ARRAY['age', 'children'], ARRAY[28, 3]), ARRAY['Dog', 'Cat', 'Pig'], ROW('Santa', 'Claus', 1000, 'MALE'))", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT doc_id, info['age'], pets[1], user_info.surname FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"Doc213", 28, "Cat", "Claus"})});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsWithSpecVersion")
    public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_trino_reading_spark_iceberg_table_properties_" + storageFormat);
        String propertiesTableName = "\"" + baseTableName + "$properties\"";
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        String trinoPropertiesTableName = TestIcebergSparkCompatibility.trinoTableName(propertiesTableName);
        QueryExecutors.onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (\n doc_id STRING)\n USING ICEBERG TBLPROPERTIES ( 'write.format.default'='%s', 'format-version' = %s, 'custom.table-property' = 'my_custom_value')", sparkTableName, storageFormat.toString(), specVersion), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT key, value FROM " + trinoPropertiesTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"custom.table-property", "my_custom_value"}), QueryAssert.Row.row((Object[])new Object[]{"write.format.default", storageFormat.name()}), QueryAssert.Row.row((Object[])new Object[]{"owner", "hive"})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsWithSpecVersion")
    public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_trino_reading_nested_spark_data_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (\n  doc_id STRING\n, nested_map MAP<STRING, ARRAY<STRUCT<sname: STRING, snumber: INT>>>\n, nested_array ARRAY<MAP<STRING, ARRAY<STRUCT<mname: STRING, mnumber: INT>>>>\n, nested_struct STRUCT<name:STRING, complicated: ARRAY<MAP<STRING, ARRAY<STRUCT<mname: STRING, mnumber: INT>>>>>)\n USING ICEBERG TBLPROPERTIES ('write.format.default'='%s', 'format-version' = %s)", new Object[]{sparkTableName, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO TABLE %s SELECT  'Doc213', map('s1', array(named_struct('sname', 'ASName1', 'snumber', 201), named_struct('sname', 'ASName2', 'snumber', 202))), array(map('m1', array(named_struct('mname', 'MAS1Name1', 'mnumber', 301), named_struct('mname', 'MAS1Name2', 'mnumber', 302)))       ,map('m2', array(named_struct('mname', 'MAS2Name1', 'mnumber', 401), named_struct('mname', 'MAS2Name2', 'mnumber', 402)))), named_struct('name', 'S1',               'complicated', array(map('m1', array(named_struct('mname', 'SAMA1Name1', 'mnumber', 301), named_struct('mname', 'SAMA1Name2', 'mnumber', 302)))                                   ,map('m2', array(named_struct('mname', 'SAMA2Name1', 'mnumber', 401), named_struct('mname', 'SAMA2Name2', 'mnumber', 402)))))", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{"Doc213", "ASName2", 201, "MAS2Name1", 302, "SAMA1Name1", 402});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT  doc_id, nested_map['s1'][1].sname, nested_map['s1'][0].snumber, nested_array[1]['m2'][0].mname, nested_array[0]['m1'][1].mnumber, nested_struct.complicated[0]['m1'][0].mname, nested_struct.complicated[1]['m2'][1].mnumber  FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT  doc_id, nested_map['s1'][2].sname, nested_map['s1'][1].snumber, nested_array[2]['m2'][1].mname, nested_array[1]['m1'][2].mnumber, nested_struct.complicated[1]['m1'][1].mname, nested_struct.complicated[2]['m2'][2].mnumber  FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormats")
    public void testSparkReadingNestedTrinoData(StorageFormat storageFormat) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_spark_reading_nested_trino_data_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (\n  doc_id VARCHAR\n, nested_map MAP(VARCHAR, ARRAY(ROW(sname VARCHAR, snumber INT)))\n, nested_array ARRAY(MAP(VARCHAR, ARRAY(ROW(mname VARCHAR, mnumber INT))))\n, nested_struct ROW(name VARCHAR, complicated ARRAY(MAP(VARCHAR, ARRAY(ROW(mname VARCHAR, mnumber INT))))))  WITH (format = '%s')", new Object[]{trinoTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s SELECT  'Doc213', map(array['s1'], array[array[row('ASName1', 201), row('ASName2', 202)]]), array[map(array['m1'], array[array[row('MAS1Name1', 301), row('MAS1Name2', 302)]])       ,map(array['m2'], array[array[row('MAS2Name1', 401), row('MAS2Name2', 402)]])], row('S1'      ,array[map(array['m1'], array[array[row('SAMA1Name1', 301), row('SAMA1Name2', 302)]])            ,map(array['m2'], array[array[row('SAMA2Name1', 401), row('SAMA2Name2', 402)]])])", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{"Doc213", "ASName2", 201, "MAS2Name1", 302, "SAMA1Name1", 402});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT  doc_id, nested_map['s1'][2].sname, nested_map['s1'][1].snumber, nested_array[2]['m2'][1].mname, nested_array[1]['m1'][2].mnumber, nested_struct.complicated[1]['m1'][1].mname, nested_struct.complicated[2]['m2'][2].mnumber  FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryResult sparkResult = QueryExecutors.onSpark().executeQuery("SELECT  doc_id, nested_map['s1'][1].sname, nested_map['s1'][0].snumber, nested_array[1]['m2'][0].mname, nested_array[0]['m1'][1].mnumber, nested_struct.complicated[0]['m1'][0].mname, nested_struct.complicated[1]['m2'][1].mnumber  FROM " + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)sparkResult).containsOnly(new QueryAssert.Row[]{row});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsWithSpecVersion")
    public void testIdBasedFieldMapping(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_schema_evolution_for_nested_fields_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (remove_col BIGINT, rename_col BIGINT, keep_col BIGINT, drop_and_add_col BIGINT, CaseSensitiveCol BIGINT, a_struct STRUCT<removed: BIGINT, rename:BIGINT, keep:BIGINT, drop_and_add:BIGINT, CaseSensitive:BIGINT>, a_partition BIGINT)  USING ICEBERG PARTITIONED BY (a_partition) TBLPROPERTIES ('write.format.default' = '%s', 'format-version' = %s)", new Object[]{sparkTableName, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO TABLE %s SELECT 1, 2, 3, 4, 5,  named_struct('removed', 10, 'rename', 11, 'keep', 12, 'drop_and_add', 13, 'CaseSensitive', 14), 1001", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s DROP COLUMN remove_col", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s RENAME COLUMN rename_col TO quite_renamed_col", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s DROP COLUMN drop_and_add_col", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s ADD COLUMN drop_and_add_col BIGINT", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s ADD COLUMN add_col BIGINT", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s DROP COLUMN a_struct.removed", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s RENAME COLUMN a_struct.rename TO renamed", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s DROP COLUMN a_struct.drop_and_add", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s ADD COLUMN a_struct.drop_and_add BIGINT", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s ADD COLUMN a_struct.added BIGINT", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("DESCRIBE " + trinoTableName, new QueryExecutor.QueryParam[0]).project(new int[]{1, 2})).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"quite_renamed_col", "bigint"}), QueryAssert.Row.row((Object[])new Object[]{"keep_col", "bigint"}), QueryAssert.Row.row((Object[])new Object[]{"drop_and_add_col", "bigint"}), QueryAssert.Row.row((Object[])new Object[]{"add_col", "bigint"}), QueryAssert.Row.row((Object[])new Object[]{"casesensitivecol", "bigint"}), QueryAssert.Row.row((Object[])new Object[]{"a_struct", "row(renamed bigint, keep bigint, CaseSensitive bigint, drop_and_add bigint, added bigint)"}), QueryAssert.Row.row((Object[])new Object[]{"a_partition", "bigint"})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT quite_renamed_col, keep_col, drop_and_add_col, add_col, casesensitivecol, a_struct, a_partition FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{2L, 3L, null, null, 5L, this.rowBuilder().addField("renamed", (Object)11L).addField("keep", (Object)12L).addField("CaseSensitive", (Object)14L).addField("drop_and_add", null).addField("added", null).build(), 1001L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT a_struct.renamed FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{11L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT a_struct.keep FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{12L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT a_struct.casesensitive FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{14L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT a_struct.drop_and_add FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT a_struct.added FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT keep_col FROM %s WHERE a_struct.renamed = 11", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{3L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT keep_col FROM %s WHERE a_struct.keep = 12", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{3L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT keep_col FROM %s WHERE a_struct.casesensitive = 14", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{3L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT keep_col FROM %s WHERE a_struct.drop_and_add IS NULL", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{3L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT keep_col FROM %s WHERE a_struct.added IS NULL", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{3L})});
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO TABLE %s SELECT 12, 13, 15, named_struct('renamed', 111, 'keep', 112, 'CaseSensitive', 113, 'drop_and_add', 114, 'added', 115), 1001, 14, 15", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT DISTINCT a_struct.renamed, a_struct.added, a_struct.keep FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{11L, null, 12L}), QueryAssert.Row.row((Object[])new Object[]{111L, 115L, 112L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT DISTINCT a_struct.renamed, a_struct.keep FROM " + trinoTableName + " WHERE a_struct.added IS NULL", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{11L, 12L})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT a_struct FROM " + trinoTableName + " WHERE a_struct.added IS NOT NULL", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{this.rowBuilder().addField("renamed", (Object)111L).addField("keep", (Object)112L).addField("CaseSensitive", (Object)113L).addField("drop_and_add", (Object)114L).addField("added", (Object)115L).build()})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsWithSpecVersion")
    public void testReadAfterPartitionEvolution(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_read_after_partition_evolution_" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (int_col BIGINT, struct_col STRUCT<field_one: INT, field_two: INT>, timestamp_col TIMESTAMP)  USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s', 'format-version' = %s)", new Object[]{sparkTableName, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, named_struct('field_one', 1, 'field_two', 1), TIMESTAMP '2021-06-28 14:16:00.456')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD bucket(3, int_col)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (2, named_struct('field_one', 2, 'field_two', 2), TIMESTAMP '2022-06-28 14:16:00.456')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD struct_col.field_one", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (3, named_struct('field_one', 3, 'field_two', 3), TIMESTAMP '2023-06-28 14:16:00.456')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD struct_col.field_one", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD struct_col.field_two", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (4, named_struct('field_one', 4, 'field_two', 4), TIMESTAMP '2024-06-28 14:16:00.456')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD bucket(3, int_col)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD struct_col.field_two", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD days(timestamp_col)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (5, named_struct('field_one', 5, 'field_two', 5), TIMESTAMP '2025-06-28 14:16:00.456')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD hours(timestamp_col)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (6, named_struct('field_one', 6, 'field_two', 6), TIMESTAMP '2026-06-28 14:16:00.456')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD int_col", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (7, named_struct('field_one', 7, 'field_two', 7), TIMESTAMP '2027-06-28 14:16:00.456')", new QueryExecutor.QueryParam[0]);
        Function<Integer, Row> buildStructColValue = intValue -> this.rowBuilder().addField("field_one", intValue).addField("field_two", intValue).build();
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT int_col, struct_col, CAST(timestamp_col AS TIMESTAMP) FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, buildStructColValue.apply(1), Timestamp.valueOf("2021-06-28 14:16:00.456")}), QueryAssert.Row.row((Object[])new Object[]{2, buildStructColValue.apply(2), Timestamp.valueOf("2022-06-28 14:16:00.456")}), QueryAssert.Row.row((Object[])new Object[]{3, buildStructColValue.apply(3), Timestamp.valueOf("2023-06-28 14:16:00.456")}), QueryAssert.Row.row((Object[])new Object[]{4, buildStructColValue.apply(4), Timestamp.valueOf("2024-06-28 14:16:00.456")}), QueryAssert.Row.row((Object[])new Object[]{5, buildStructColValue.apply(5), Timestamp.valueOf("2025-06-28 14:16:00.456")}), QueryAssert.Row.row((Object[])new Object[]{6, buildStructColValue.apply(6), Timestamp.valueOf("2026-06-28 14:16:00.456")}), QueryAssert.Row.row((Object[])new Object[]{7, buildStructColValue.apply(7), Timestamp.valueOf("2027-06-28 14:16:00.456")})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT struct_col.field_two FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1}), QueryAssert.Row.row((Object[])new Object[]{2}), QueryAssert.Row.row((Object[])new Object[]{3}), QueryAssert.Row.row((Object[])new Object[]{4}), QueryAssert.Row.row((Object[])new Object[]{5}), QueryAssert.Row.row((Object[])new Object[]{6}), QueryAssert.Row.row((Object[])new Object[]{7})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT CAST(timestamp_col AS TIMESTAMP) FROM " + trinoTableName + " WHERE struct_col.field_two = 2", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{Timestamp.valueOf("2022-06-28 14:16:00.456")})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT count(*) FROM " + trinoTableName + " WHERE int_col = 2", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT count(*) FROM " + trinoTableName + " WHERE int_col % 2 = 0", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{3})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT count(*) FROM " + trinoTableName + " WHERE struct_col.field_one = 2", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT count(*) FROM " + trinoTableName + " WHERE struct_col.field_one % 2 = 0", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{3})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT count(*) FROM " + trinoTableName + " WHERE year(timestamp_col) = 2022", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT count(*) FROM " + trinoTableName + " WHERE year(timestamp_col) % 2 = 0", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{3})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "iceberg_jdbc", "profile_specific_tests"}, dataProvider="specVersions")
    public void testTrinoShowingSparkCreatedTables(int specVersion) {
        String sparkTable = "test_table_listing_for_spark";
        String trinoTable = "test_table_listing_for_trino";
        QueryExecutors.onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTable, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTable, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG TBLPROPERTIES('format-version' = %s)", TestIcebergSparkCompatibility.sparkTableName(sparkTable), specVersion), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (_integer INTEGER )", TestIcebergSparkCompatibility.trinoTableName(trinoTable)), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SHOW TABLES FROM %s.%s LIKE '%s'", TRINO_CATALOG, TEST_SCHEMA_NAME, "test_table_listing_for_%"), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{sparkTable}), QueryAssert.Row.row((Object[])new Object[]{trinoTable})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + TestIcebergSparkCompatibility.sparkTableName(sparkTable), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + TestIcebergSparkCompatibility.trinoTableName(trinoTable), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="specVersions")
    public void testCreateAndDropTableWithSameLocationWorksOnSpark(int specVersion) {
        String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data";
        String tableSameLocation1 = "test_same_location_spark_1_" + TestingNames.randomNameSuffix();
        String tableSameLocation2 = "test_same_location_spark_2_" + TestingNames.randomNameSuffix();
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)", TestIcebergSparkCompatibility.sparkTableName(tableSameLocation1), dataPath, specVersion), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)", TestIcebergSparkCompatibility.sparkTableName(tableSameLocation2), dataPath, specVersion), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("DROP TABLE IF EXISTS %s", TestIcebergSparkCompatibility.sparkTableName(tableSameLocation1)), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM %s", TestIcebergSparkCompatibility.trinoTableName(tableSameLocation2)), new QueryExecutor.QueryParam[0])).hasNoRows();
        QueryExecutors.onSpark().executeQuery(String.format("DROP TABLE %s", TestIcebergSparkCompatibility.sparkTableName(tableSameLocation2)), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "iceberg_jdbc", "profile_specific_tests"}, dataProvider="specVersions")
    public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion) {
        String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data";
        String tableSameLocation1 = "test_same_location_trino_1_" + TestingNames.randomNameSuffix();
        String tableSameLocation2 = "test_same_location_trino_2_" + TestingNames.randomNameSuffix();
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)", TestIcebergSparkCompatibility.sparkTableName(tableSameLocation1), dataPath, specVersion), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)", TestIcebergSparkCompatibility.sparkTableName(tableSameLocation2), dataPath, specVersion), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("DROP TABLE %s", TestIcebergSparkCompatibility.trinoTableName(tableSameLocation1)), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM %s", TestIcebergSparkCompatibility.trinoTableName(tableSameLocation2)), new QueryExecutor.QueryParam[0])).hasMessageMatching(".*Failed to open input stream for file.*");
    }

    @Test(groups={"iceberg", "iceberg_jdbc", "profile_specific_tests"}, dataProvider="storageFormatsWithSpecVersion")
    public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_object_storage_location_provider_" + storageFormat);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_object_storage_location_provider/obj-data";
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG TBLPROPERTIES ('write.object-storage.enabled'=true,'write.object-storage.path'='%s','write.format.default' = '%s','format-version' = %s)", new Object[]{sparkTableName, dataPath, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a_string', 1000000000000000)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.Row result = QueryAssert.Row.row((Object[])new Object[]{"a_string", 1000000000000000L});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format("SELECT _string, _bigint FROM %s", sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{result});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT _string, _bigint FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{result});
        QueryResult queryResult = QueryExecutors.onTrino().executeQuery(String.format("SELECT file_path FROM %s", TestIcebergSparkCompatibility.trinoTableName("\"" + baseTableName + "$files\"")), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)queryResult).hasRowsCount(1).hasColumnsCount(1);
        Assert.assertTrue((boolean)((String)queryResult.getOnlyValue()).contains(dataPath));
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0])).hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino");
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "iceberg_jdbc", "profile_specific_tests"}, dataProvider="storageFormatsWithSpecVersion")
    public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_writer_data_path_" + storageFormat);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_writer_data_path_/obj-data";
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG TBLPROPERTIES ('write.data.path'='%s','write.format.default' = '%s','format-version' = %s)", new Object[]{sparkTableName, dataPath, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a_string', 1000000000000000)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.Row result = QueryAssert.Row.row((Object[])new Object[]{"a_string", 1000000000000000L});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format("SELECT _string, _bigint FROM %s", sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{result});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT _string, _bigint FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{result});
        QueryResult queryResult = QueryExecutors.onTrino().executeQuery(String.format("SELECT file_path FROM %s", TestIcebergSparkCompatibility.trinoTableName("\"" + baseTableName + "$files\"")), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)queryResult).hasRowsCount(1).hasColumnsCount(1);
        Assert.assertTrue((boolean)((String)queryResult.getOnlyValue()).contains(dataPath));
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0])).hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino");
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testStringPartitioningWithSpecialCharactersCtasInTrino() {
        String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino";
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (id, part_col) WITH (partitioning = ARRAY['part_col']) AS VALUES %s", trinoTableName, TRINO_INSERTED_PARTITION_VALUES), new QueryExecutor.QueryParam[0]);
        this.assertSelectsOnSpecialCharacters(trinoTableName, sparkTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testStringPartitioningWithSpecialCharactersInsertInTrino() {
        String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino";
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (id BIGINT, part_col VARCHAR) WITH (partitioning = ARRAY['part_col'])", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES %s", trinoTableName, TRINO_INSERTED_PARTITION_VALUES), new QueryExecutor.QueryParam[0]);
        this.assertSelectsOnSpecialCharacters(trinoTableName, sparkTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testStringPartitioningWithSpecialCharactersInsertInSpark() {
        String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_spark";
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (id BIGINT, part_col VARCHAR) WITH (partitioning = ARRAY['part_col'])", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s VALUES %s", sparkTableName, SPARK_INSERTED_PARTITION_VALUES), new QueryExecutor.QueryParam[0]);
        this.assertSelectsOnSpecialCharacters(trinoTableName, sparkTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testPartitionedByNonLowercaseColumn() {
        String baseTableName = "test_partitioned_by_non_lowercase_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + " USING ICEBERG PARTITIONED BY (`PART`) TBLPROPERTIES ('format-version'='2') AS SELECT 1 AS data, 2 AS `PART`", new QueryExecutor.QueryParam[0]);
        try {
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).contains(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2})});
            QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (3, 4)", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).contains(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2}), QueryAssert.Row.row((Object[])new Object[]{3, 4})});
            QueryExecutors.onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE data = 3", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).contains(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2})});
            QueryExecutors.onTrino().executeQuery("UPDATE " + trinoTableName + " SET part = 20", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).contains(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 20})});
            QueryExecutors.onTrino().executeQuery("MERGE INTO " + trinoTableName + " USING (SELECT 1 a) input ON true WHEN MATCHED THEN DELETE", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).hasNoRows();
        }
        finally {
            QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
        }
    }

    @Test(groups={"iceberg", "iceberg_jdbc", "profile_specific_tests"})
    public void testPartitioningWithMixedCaseColumnUnsupportedInTrino() {
        String baseTableName = "test_partitioning_with_mixed_case_column_in_spark";
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (id INTEGER, `mIxEd_COL` STRING) USING ICEBERG", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " SET PROPERTIES partitioning = ARRAY['mIxEd_COL']", new QueryExecutor.QueryParam[0])).hasMessageContaining("Unable to parse partitioning value");
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " SET PROPERTIES partitioning = ARRAY['\"mIxEd_COL\"']", new QueryExecutor.QueryParam[0])).hasMessageContaining("Unable to parse partitioning value");
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testInsertReadingFromParquetTableWithNestedRowFieldNotPresentInDataFile() {
        String sourceTableNameBase = "test_nested_missing_row_field_source";
        String trinoSourceTableName = TestIcebergSparkCompatibility.trinoTableName(sourceTableNameBase);
        String sparkSourceTableName = TestIcebergSparkCompatibility.sparkTableName(sourceTableNameBase);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoSourceTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoSourceTableName + " WITH (format = 'PARQUET') AS  SELECT CAST(    ROW(1, ROW(2, 3)) AS     ROW(foo BIGINT, a_sub_struct ROW(x BIGINT, y BIGINT)) ) AS a_struct", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkSourceTableName + " ADD COLUMN a_struct.a_sub_struct_2 STRUCT<z: BIGINT>", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoSourceTableName + " SELECT CAST(    ROW(1, ROW(2, 3), ROW(4)) AS     ROW(foo BIGINT,\n        a_sub_struct ROW(x BIGINT, y BIGINT),         a_sub_struct_2 ROW(z BIGINT)    )) AS a_struct", new QueryExecutor.QueryParam[0]);
        String trinoTargetTableName = TestIcebergSparkCompatibility.trinoTableName("test_nested_missing_row_field_target");
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTargetTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTargetTableName + " WITH (format = 'PARQUET') AS SELECT * FROM " + trinoSourceTableName, new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTargetTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{this.rowBuilder().addField("foo", (Object)1L).addField("a_sub_struct", (Object)this.rowBuilder().addField("x", (Object)2L).addField("y", (Object)3L).build()).addField("a_sub_struct_2", null).build()}), QueryAssert.Row.row((Object[])new Object[]{this.rowBuilder().addField("foo", (Object)1L).addField("a_sub_struct", (Object)this.rowBuilder().addField("x", (Object)2L).addField("y", (Object)3L).build()).addField("a_sub_struct_2", (Object)this.rowBuilder().addField("z", (Object)4L).build()).build()})});
    }

    private void assertSelectsOnSpecialCharacters(String trinoTableName, String sparkTableName) {
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(EXPECTED_PARTITION_VALUES);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(EXPECTED_PARTITION_VALUES);
        for (String value : SPECIAL_CHARACTER_VALUES) {
            String trinoValue = TestIcebergSparkCompatibility.escapeTrinoString(value);
            String sparkValue = TestIcebergSparkCompatibility.escapeSparkString(value);
            ((QueryAssert)QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT count(*) FROM " + sparkTableName + " WHERE part_col = '" + sparkValue + "'", new QueryExecutor.QueryParam[0])).withFailMessage("Spark query with predicate containing '" + value + "' contained no matches, expected one", new Object[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
            ((QueryAssert)QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT count(*) FROM " + trinoTableName + " WHERE part_col = '" + trinoValue + "'", new QueryExecutor.QueryParam[0])).withFailMessage("Trino query with predicate containing '" + value + "' contained no matches, expected one", new Object[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        }
    }

    @Test(groups={"iceberg", "profile_specific_tests"})
    public void testTrinoReadsSparkSortOrder() {
        String sourceTableNameBase = "test_insert_into_sorted_table_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(sourceTableNameBase);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(sourceTableNameBase);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + " (a INT, b INT, c INT) USING ICEBERG", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " WRITE ORDERED BY b, c DESC NULLS LAST", new QueryExecutor.QueryParam[0]);
        Assertions.assertThat((String)((String)QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]).getOnlyValue())).contains(new CharSequence[]{"sorted_by = ARRAY['b ASC NULLS FIRST','c DESC NULLS LAST']"});
        QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (3, 2, 1), (1, 2, 3), (NULL, NULL, NULL)", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT _pos, a, b, c FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).contains(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{0, null, null, null}), QueryAssert.Row.row((Object[])new Object[]{1, 1, 2, 3}), QueryAssert.Row.row((Object[])new Object[]{2, 3, 2, 1})});
    }

    @Test(groups={"iceberg", "profile_specific_tests"})
    public void testTrinoIgnoresUnsupportedSparkSortOrder() {
        String sourceTableNameBase = "test_insert_into_sorted_table_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(sourceTableNameBase);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(sourceTableNameBase);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + " (a INT, b INT, c INT) USING ICEBERG", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " WRITE ORDERED BY truncate(b, 3), a NULLS LAST", new QueryExecutor.QueryParam[0]);
        Assertions.assertThat((String)((String)QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]).getOnlyValue())).doesNotContain(new CharSequence[]{"sorted_by"});
        QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (3, 2333, 1), (NULL, NULL, NULL), (1, 2222, 3)", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT _pos, a, b, c FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).contains(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{0, 1, 2222, 3}), QueryAssert.Row.row((Object[])new Object[]{1, 3, 2333, 1}), QueryAssert.Row.row((Object[])new Object[]{2, null, null, null})});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, timeOut=60000L)
    public void testTrinoSparkConcurrentInsert() throws Exception {
        int insertsPerEngine = 7;
        String baseTableName = "trino_spark_insert_concurrent_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(e varchar, a bigint)", new QueryExecutor.QueryParam[0]);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        try {
            CyclicBarrier barrier = new CyclicBarrier(2);
            QueryExecutor onTrino = QueryExecutors.onTrino();
            QueryExecutor onSpark = QueryExecutors.onSpark();
            List allInserted = (List)executor.invokeAll((Collection)Stream.of(Engine.TRINO, Engine.SPARK).map(engine -> () -> {
                ArrayList<QueryAssert.Row> inserted = new ArrayList<QueryAssert.Row>();
                block6: for (int i = 0; i < insertsPerEngine; ++i) {
                    barrier.await(20L, TimeUnit.SECONDS);
                    String engineName = engine.name().toLowerCase(Locale.ENGLISH);
                    long value = i;
                    switch (engine) {
                        case TRINO: {
                            try {
                                onTrino.executeQuery(String.format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value), new QueryExecutor.QueryParam[0]);
                                break;
                            }
                            catch (QueryExecutionException queryExecutionException) {
                                continue block6;
                            }
                        }
                        case SPARK: {
                            onSpark.executeQuery(String.format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value), new QueryExecutor.QueryParam[0]);
                            break;
                        }
                        default: {
                            throw new UnsupportedOperationException("Unexpected engine: " + engine);
                        }
                    }
                    inserted.add(QueryAssert.Row.row((Object[])new Object[]{engineName, value}));
                }
                return inserted;
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getDone).flatMap(Collection::stream).collect(ImmutableList.toImmutableList());
            Assertions.assertThat((List)allInserted).hasSizeBetween(insertsPerEngine, insertsPerEngine * 2);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT count(*) FROM " + trinoTableName + " WHERE e = 'spark'", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{insertsPerEngine})});
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(allInserted);
            QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsAndCompressionCodecs")
    public void testTrinoReadingSparkCompressedData(StorageFormat storageFormat, String compressionCodec) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_spark_compression_" + storageFormat + "_" + compressionCodec + "_" + TestingNames.randomNameSuffix());
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        List rows = (List)IntStream.range(0, 555).mapToObj(i -> QueryAssert.Row.row((Object[])new Object[]{"a" + i, i})).collect(ImmutableList.toImmutableList());
        switch (storageFormat) {
            case PARQUET: {
                QueryExecutors.onSpark().executeQuery("SET spark.sql.parquet.compression.codec = " + compressionCodec, new QueryExecutor.QueryParam[0]);
                break;
            }
            case ORC: {
                if ("GZIP".equals(compressionCodec)) {
                    QueryExecutors.onSpark().executeQuery("SET spark.sql.orc.compression.codec = zlib", new QueryExecutor.QueryParam[0]);
                    break;
                }
                QueryExecutors.onSpark().executeQuery("SET spark.sql.orc.compression.codec = " + compressionCodec, new QueryExecutor.QueryParam[0]);
                break;
            }
            case AVRO: {
                if ("NONE".equals(compressionCodec)) {
                    QueryExecutors.onSpark().executeQuery("SET spark.sql.avro.compression.codec = uncompressed", new QueryExecutor.QueryParam[0]);
                    break;
                }
                if ("SNAPPY".equals(compressionCodec)) {
                    QueryExecutors.onSpark().executeQuery("SET spark.sql.avro.compression.codec = snappy", new QueryExecutor.QueryParam[0]);
                    break;
                }
                if ("ZSTD".equals(compressionCodec)) {
                    QueryExecutors.onSpark().executeQuery("SET spark.sql.avro.compression.codec = zstandard", new QueryExecutor.QueryParam[0]);
                    break;
                }
                QueryAssert.assertQueryFailure(() -> QueryExecutors.onSpark().executeQuery("SET spark.sql.avro.compression.codec = " + compressionCodec, new QueryExecutor.QueryParam[0])).hasMessageContaining("The value of spark.sql.avro.compression.codec should be one of bzip2, deflate, uncompressed, xz, snappy, zstandard");
                throw new SkipException("Unsupported compression codec");
            }
            default: {
                throw new UnsupportedOperationException("Unsupported storage format: " + storageFormat);
            }
        }
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + " (a string, b bigint) USING ICEBERG TBLPROPERTIES ('write.format.default' = '" + storageFormat + "')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES " + rows.stream().map(row -> String.format("('%s', %s)", row.getValues().get(0), row.getValues().get(1))).collect(Collectors.joining(", ")), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(rows);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(rows);
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsAndCompressionCodecs")
    public void testSparkReadingTrinoCompressedData(StorageFormat storageFormat, String compressionCodec) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_trino_compression_" + storageFormat + "_" + compressionCodec + "_" + TestingNames.randomNameSuffix());
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("SET SESSION iceberg.compression_codec = '" + compressionCodec + "'", new QueryExecutor.QueryParam[0]);
        String createTable = "CREATE TABLE " + trinoTableName + " WITH (format = '" + storageFormat + "') AS TABLE tpch.tiny.nation";
        if (storageFormat == StorageFormat.PARQUET && "LZ4".equals(compressionCodec)) {
            QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery(createTable, new QueryExecutor.QueryParam[0])).hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Unsupported codec: LZ4");
            return;
        }
        if (storageFormat == StorageFormat.AVRO && compressionCodec.equals("LZ4")) {
            QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery(createTable, new QueryExecutor.QueryParam[0])).hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Unsupported compression codec: " + compressionCodec);
            return;
        }
        QueryExecutors.onTrino().executeQuery(createTable, new QueryExecutor.QueryParam[0]);
        List expected = (List)QueryExecutors.onTrino().executeQuery("TABLE tpch.tiny.nation", new QueryExecutor.QueryParam[0]).rows().stream().map(row -> QueryAssert.Row.row((Object[])row.toArray())).collect(ImmutableList.toImmutableList());
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "iceberg_jdbc", "profile_specific_tests"})
    public void verifyCompressionCodecsDataProvider() {
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW SESSION LIKE 'iceberg.compression_codec'", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"iceberg.compression_codec", "ZSTD", "ZSTD", "varchar", "Compression codec to use when writing files. Possible values: " + this.compressionCodecs()})});
    }

    @DataProvider
    public Object[][] storageFormatsAndCompressionCodecs() {
        List<String> compressionCodecs = this.compressionCodecs();
        return (Object[][])Stream.of(StorageFormat.values()).flatMap(storageFormat -> compressionCodecs.stream().map(compressionCodec -> new Object[]{storageFormat, compressionCodec})).toArray(x$0 -> new Object[x$0][]);
    }

    private List<String> compressionCodecs() {
        return List.of("NONE", "SNAPPY", "LZ4", "ZSTD", "GZIP");
    }

    @Test(groups={"iceberg", "iceberg_jdbc", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) {
        String baseTableName;
        block2: {
            baseTableName = "test_trino_reading_migrated_nested_data_" + TestingNames.randomNameSuffix();
            String defaultCatalogTableName = TestIcebergSparkCompatibility.sparkDefaultCatalogTableName(baseTableName);
            String sparkTableDefinition = "CREATE TABLE %s (\n  doc_id STRING\n, nested_map MAP<STRING, ARRAY<STRUCT<sName: STRING, sNumber: INT>>>\n, nested_array ARRAY<MAP<STRING, ARRAY<STRUCT<mName: STRING, mNumber: INT>>>>\n, nested_struct STRUCT<id:INT, name:STRING, address:STRUCT<street_number:INT, street_name:STRING>>)\n USING %s";
            QueryExecutors.onSpark().executeQuery(String.format(sparkTableDefinition, defaultCatalogTableName, storageFormat.name().toLowerCase(Locale.ENGLISH)), new QueryExecutor.QueryParam[0]);
            String insert = "INSERT INTO TABLE %s SELECT  'Doc213', map('s1', array(named_struct('sName', 'ASName1', 'sNumber', 201), named_struct('sName', 'ASName2', 'sNumber', 202))), array(map('m1', array(named_struct('mName', 'MAS1Name1', 'mNumber', 301), named_struct('mName', 'MAS1Name2', 'mNumber', 302)))       ,map('m2', array(named_struct('mName', 'MAS2Name1', 'mNumber', 401), named_struct('mName', 'MAS2Name2', 'mNumber', 402)))), named_struct('id', 1, 'name', 'P. Sherman', 'address', named_struct('street_number', 42, 'street_name', 'Wallaby Way'))";
            QueryExecutors.onSpark().executeQuery(String.format(insert, defaultCatalogTableName), new QueryExecutor.QueryParam[0]);
            try {
                QueryExecutors.onSpark().executeQuery(String.format("CALL system.migrate('%s')", defaultCatalogTableName), new QueryExecutor.QueryParam[0]);
            }
            catch (QueryExecutionException e) {
                if (!e.getMessage().contains("Cannot use catalog spark_catalog: not a ProcedureCatalog")) break block2;
                throw new SkipException("This catalog doesn't support calling system.migrate procedure");
            }
        }
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{"Doc213", "ASName2", 201, "MAS2Name1", 302, "P. Sherman", 42, "Wallaby Way"});
        String sparkSelect = "SELECT  doc_id, nested_map['s1'][1].sName, nested_map['s1'][0].sNumber, nested_array[1]['m2'][0].mName, nested_array[0]['m1'][1].mNumber, nested_struct.name, nested_struct.address.street_number, nested_struct.address.street_name  FROM ";
        QueryResult sparkResult = QueryExecutors.onSpark().executeQuery(sparkSelect + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)sparkResult).containsOnly(new QueryAssert.Row[]{row});
        String trinoSelect = "SELECT  doc_id, nested_map['s1'][2].sName, nested_map['s1'][1].sNumber, nested_array[2]['m2'][1].mName, nested_array[1]['m1'][2].mNumber, nested_struct.name, nested_struct.address.street_number, nested_struct.address.street_name  FROM ";
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        QueryResult trinoResult = QueryExecutors.onTrino().executeQuery(trinoSelect + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)trinoResult).containsOnly(new QueryAssert.Row[]{row});
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s UNSET TBLPROPERTIES ('schema.name-mapping.default')", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(trinoSelect + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, null, null, null, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null, null, null, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT nested_struct.address.street_number, nested_struct.address.street_name FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null, null})});
    }

    @Test(groups={"iceberg", "iceberg_jdbc", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testMigratedDataWithAlteredSchema(StorageFormat storageFormat) {
        String baseTableName;
        block2: {
            baseTableName = "test_migrated_data_with_altered_schema_" + TestingNames.randomNameSuffix();
            String defaultCatalogTableName = TestIcebergSparkCompatibility.sparkDefaultCatalogTableName(baseTableName);
            String sparkTableDefinition = "CREATE TABLE %s (\n  doc_id STRING\n, nested_struct STRUCT<id:INT, name:STRING, address:STRUCT<a:INT, b:STRING>>)\n USING %s";
            QueryExecutors.onSpark().executeQuery(String.format(sparkTableDefinition, new Object[]{defaultCatalogTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
            String insert = "INSERT INTO TABLE %s SELECT  'Doc213', named_struct('id', 1, 'name', 'P. Sherman', 'address', named_struct('a', 42, 'b', 'Wallaby Way'))";
            QueryExecutors.onSpark().executeQuery(String.format(insert, defaultCatalogTableName), new QueryExecutor.QueryParam[0]);
            try {
                QueryExecutors.onSpark().executeQuery(String.format("CALL system.migrate('%s')", defaultCatalogTableName), new QueryExecutor.QueryParam[0]);
            }
            catch (QueryExecutionException e) {
                if (!e.getMessage().contains("Cannot use catalog spark_catalog: not a ProcedureCatalog")) break block2;
                throw new SkipException("This catalog doesn't support calling system.migrate procedure");
            }
        }
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " RENAME COLUMN nested_struct TO nested_struct_moved", new QueryExecutor.QueryParam[0]);
        String select = "SELECT nested_struct_moved.name, nested_struct_moved.address.a, nested_struct_moved.address.b  FROM ";
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{"P. Sherman", 42, "Wallaby Way"});
        QueryResult sparkResult = QueryExecutors.onSpark().executeQuery(select + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)sparkResult).containsOnly((List)ImmutableList.of((Object)row));
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(select + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)ImmutableList.of((Object)row));
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s UNSET TBLPROPERTIES ('schema.name-mapping.default')", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(select + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null, null, null})});
    }

    @Test(groups={"iceberg", "iceberg_jdbc", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testMigratedDataWithPartialNameMapping(StorageFormat storageFormat) {
        String baseTableName;
        block2: {
            baseTableName = "test_migrated_data_with_partial_name_mapping_" + TestingNames.randomNameSuffix();
            String defaultCatalogTableName = TestIcebergSparkCompatibility.sparkDefaultCatalogTableName(baseTableName);
            String sparkTableDefinition = "CREATE TABLE %s (a INT, b INT) USING " + storageFormat.name().toLowerCase(Locale.ENGLISH);
            QueryExecutors.onSpark().executeQuery(String.format(sparkTableDefinition, defaultCatalogTableName), new QueryExecutor.QueryParam[0]);
            String insert = "INSERT INTO TABLE %s SELECT 1, 2";
            QueryExecutors.onSpark().executeQuery(String.format(insert, defaultCatalogTableName), new QueryExecutor.QueryParam[0]);
            try {
                QueryExecutors.onSpark().executeQuery(String.format("CALL system.migrate('%s')", defaultCatalogTableName), new QueryExecutor.QueryParam[0]);
            }
            catch (QueryExecutionException e) {
                if (!e.getMessage().contains("Cannot use catalog spark_catalog: not a ProcedureCatalog")) break block2;
                throw new SkipException("This catalog doesn't support calling system.migrate procedure");
            }
        }
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s SET TBLPROPERTIES ('schema.name-mapping.default'='[{\"field-id\": 1, \"names\": [\"a\"]}, {\"field-id\": 2, \"names\": [\"c\"]} ]')", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT a, b FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, null})});
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testPartialStats() {
        String tableName = "test_partial_stats_" + TestingNames.randomNameSuffix();
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(tableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(tableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(col0 INT, col1 INT, col2 STRING, col3 BINARY)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2, 'col2Value0', X'000102f0feff')", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW STATS FOR " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"col0", null, null, 0.0, null, "1", "1"}), QueryAssert.Row.row((Object[])new Object[]{"col1", null, null, 0.0, null, "2", "2"}), QueryAssert.Row.row((Object[])new Object[]{"col2", 151.0, null, 0.0, null, null, null}), QueryAssert.Row.row((Object[])new Object[]{"col3", 72.0, null, 0.0, null, null, null}), QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, 1.0, null, null})});
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " SET TBLPROPERTIES (write.metadata.metrics.column.col1='none')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (3, 4, 'col2Value1', X'000102f0feee')", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW STATS FOR " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"col0", null, null, 0.0, null, "1", "3"}), QueryAssert.Row.row((Object[])new Object[]{"col1", null, null, null, null, null, null}), QueryAssert.Row.row((Object[])new Object[]{"col2", 305.0, null, 0.0, null, null, null}), QueryAssert.Row.row((Object[])new Object[]{"col3", 145.0, null, 0.0, null, null, null}), QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, 2.0, null, null})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testStatsAfterAddingPartitionField() {
        String tableName = "test_stats_after_adding_partition_field_" + TestingNames.randomNameSuffix();
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(tableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(tableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(col0 INT, col1 INT)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2)", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW STATS FOR " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"col0", null, null, 0.0, null, "1", "1"}), QueryAssert.Row.row((Object[])new Object[]{"col1", null, null, 0.0, null, "2", "2"}), QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, 1.0, null, null})});
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD col1", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (3, 4)", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW STATS FOR " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"col0", null, null, 0.0, null, "1", "3"}), QueryAssert.Row.row((Object[])new Object[]{"col1", null, null, 0.0, null, "2", "4"}), QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, 2.0, null, null})});
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD col1", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD bucket(3, col1)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (5, 6)", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW STATS FOR " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"col0", null, null, 0.0, null, "1", "5"}), QueryAssert.Row.row((Object[])new Object[]{"col1", null, null, 0.0, null, "2", "6"}), QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, 3.0, null, null})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="tableFormatWithDeleteFormat")
    public void testTrinoReadsSparkRowLevelDeletes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) {
        String tableName = TestIcebergSparkCompatibility.toLowerCase(String.format("test_trino_reads_spark_row_level_deletes_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), TestingNames.randomNameSuffix()));
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(tableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(tableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(a INT, b INT) USING ICEBERG PARTITIONED BY (b) TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read','write.format.default'='" + tableStorageFormat.name() + "','write.delete.format.default'='" + deleteFileStorageFormat.name() + "')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("CALL iceberg_test.system.rewrite_data_files(table=>'default." + tableName + "', options => map('min-input-files','1'))", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("DELETE FROM " + sparkTableName + " WHERE a = 13", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("DELETE FROM " + sparkTableName + " WHERE b = 2", new QueryExecutor.QueryParam[0]);
        ImmutableList expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{11, 12}), (Object)QueryAssert.Row.row((Object[])new Object[]{12, 12}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onSpark().executeQuery("DELETE FROM " + sparkTableName + " WHERE a = 12", new QueryExecutor.QueryParam[0]);
        expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{11, 12}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="tableFormatWithDeleteFormat")
    public void testTrinoReadsSparkRowLevelDeletesWithRowTypes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) {
        String tableName = TestIcebergSparkCompatibility.toLowerCase(String.format("test_trino_reads_spark_row_level_deletes_row_types_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), TestingNames.randomNameSuffix()));
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(tableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(tableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(part_key INT, int_t INT, row_t STRUCT<a:INT, b:INT>) USING ICEBERG PARTITIONED BY (part_key) TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read','write.format.default'='" + tableStorageFormat.name() + "','write.delete.format.default'='" + deleteFileStorageFormat.name() + "')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 1, named_struct('a', 1, 'b', 2)), (1, 2, named_struct('a', 3, 'b', 4)), (1, 3, named_struct('a', 5, 'b', 6)), (2, 4, named_struct('a', 1, 'b',2))", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("CALL iceberg_test.system.rewrite_data_files(table=>'default." + tableName + "', options => map('min-input-files','1'))", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("DELETE FROM " + sparkTableName + " WHERE int_t = 2", new QueryExecutor.QueryParam[0]);
        ImmutableList expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{1, 2}), (Object)QueryAssert.Row.row((Object[])new Object[]{1, 6}), (Object)QueryAssert.Row.row((Object[])new Object[]{2, 2}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT part_key, row_t.b FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT part_key, row_t.b FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormats")
    public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat) {
        String tableName = TestIcebergSparkCompatibility.toLowerCase(String.format("test_spark_reads_trino_row_level_deletes_%s_%s", storageFormat.name(), TestingNames.randomNameSuffix()));
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(tableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(tableName);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(a INT, b INT) WITH(partitioning = ARRAY['b'], format_version = 2, format = '" + storageFormat.name() + "')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE a = 13", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE b = 2", new QueryExecutor.QueryParam[0]);
        ImmutableList expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{11, 12}), (Object)QueryAssert.Row.row((Object[])new Object[]{12, 12}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE a = 12", new QueryExecutor.QueryParam[0]);
        expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{11, 12}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormats")
    public void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storageFormat) {
        String tableName = TestIcebergSparkCompatibility.toLowerCase(String.format("test_spark_reads_trino_row_level_deletes_row_types_%s_%s", storageFormat.name(), TestingNames.randomNameSuffix()));
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(tableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(tableName);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(part_key INT, int_t INT, row_t ROW(a INT, b INT)) WITH(partitioning = ARRAY['part_key'], format_version = 2, format = '" + storageFormat.name() + "') ", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 1, row(1, 2)), (1, 2, row(3, 4)), (1, 3, row(5, 6)), (2, 4, row(1, 2))", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE int_t = 2", new QueryExecutor.QueryParam[0]);
        ImmutableList expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{1, 2}), (Object)QueryAssert.Row.row((Object[])new Object[]{1, 6}), (Object)QueryAssert.Row.row((Object[])new Object[]{2, 2}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT part_key, row_t.b FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT part_key, row_t.b FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormats")
    public void testDeleteAfterPartitionEvolution(StorageFormat storageFormat) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_delete_after_partition_evolution_" + storageFormat + TestingNames.randomNameSuffix());
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (col0 BIGINT, col1 BIGINT, col2 BIGINT)  USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s', 'format-version' = 2, 'write.delete.mode' = 'merge-on-read')", new Object[]{sparkTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 11, 21)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD bucket(3, col0)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (2, 12, 22)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD col1", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (3, 13, 23)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD col1", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD col2", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (4, 14, 24)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD bucket(3, col0)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD col2", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD col0", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (5, 15, 25)", new QueryExecutor.QueryParam[0]);
        ArrayList<QueryAssert.Row> expected = new ArrayList<QueryAssert.Row>();
        expected.add(QueryAssert.Row.row((Object[])new Object[]{1, 11, 21}));
        expected.add(QueryAssert.Row.row((Object[])new Object[]{2, 12, 22}));
        expected.add(QueryAssert.Row.row((Object[])new Object[]{3, 13, 23}));
        expected.add(QueryAssert.Row.row((Object[])new Object[]{4, 14, 24}));
        expected.add(QueryAssert.Row.row((Object[])new Object[]{5, 15, 25}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(expected);
        for (int columnValue = 1; columnValue <= 5; ++columnValue) {
            QueryExecutors.onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE col0 = " + columnValue, new QueryExecutor.QueryParam[0]);
            expected.remove(0);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(expected);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(expected);
        }
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest"})
    public void testMissingMetrics() {
        String tableName = "test_missing_metrics_" + TestingNames.randomNameSuffix();
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(tableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + " (name STRING, country STRING) USING ICEBERG PARTITIONED BY (country) TBLPROPERTIES ('write.metadata.metrics.default'='none')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES ('Christoph', 'AT'), (NULL, 'RO')", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT count(*) FROM %s.%s.\"%s$partitions\" WHERE data IS NOT NULL", TRINO_CATALOG, TEST_SCHEMA_NAME, tableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{0})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testOptimizeOnV2IcebergTable() {
        String tableName = String.format("test_optimize_on_v2_iceberg_table_%s", TestingNames.randomNameSuffix());
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(tableName);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(tableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(a INT, b INT) USING ICEBERG PARTITIONED BY (b) TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2}), QueryAssert.Row.row((Object[])new Object[]{2, 2}), QueryAssert.Row.row((Object[])new Object[]{3, 2}), QueryAssert.Row.row((Object[])new Object[]{11, 12}), QueryAssert.Row.row((Object[])new Object[]{12, 12}), QueryAssert.Row.row((Object[])new Object[]{13, 12})});
    }

    @Test(groups={"iceberg", "profile_specific_tests"})
    public void testAlterTableExecuteProceduresOnEmptyTable() {
        String baseTableName = "test_alter_table_execute_procedures_on_empty_table_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (  _string STRING, _bigint BIGINT, _integer INTEGER) USING ICEBERG", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE optimize", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE expire_snapshots(retention_threshold => '7d')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE remove_orphan_files(retention_threshold => '7d')", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).hasNoRows();
    }

    private static String escapeSparkString(String value) {
        return value.replace("\\", "\\\\").replace("'", "\\'");
    }

    private static String escapeTrinoString(String value) {
        return value.replace("'", "''");
    }

    private static String sparkTableName(String tableName) {
        return String.format("%s.%s.%s", SPARK_CATALOG, TEST_SCHEMA_NAME, tableName);
    }

    private static String sparkDefaultCatalogTableName(String tableName) {
        return String.format("%s.%s", TEST_SCHEMA_NAME, tableName);
    }

    private static String trinoTableName(String tableName) {
        return String.format("%s.%s.%s", TRINO_CATALOG, TEST_SCHEMA_NAME, tableName);
    }

    private Row.Builder rowBuilder() {
        return Row.builder();
    }

    @DataProvider
    public static Object[][] specVersions() {
        return new Object[][]{{1}, {2}};
    }

    /*
     * Exception decompiling
     */
    @DataProvider
    public static Object[][] storageFormats() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * java.lang.UnsupportedOperationException
         *     at org.benf.cfr.reader.bytecode.analysis.parse.expression.NewAnonymousArray.getDimSize(NewAnonymousArray.java:142)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op4rewriters.LambdaRewriter.isNewArrayLambda(LambdaRewriter.java:455)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op4rewriters.LambdaRewriter.rewriteDynamicExpression(LambdaRewriter.java:409)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op4rewriters.LambdaRewriter.rewriteDynamicExpression(LambdaRewriter.java:167)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op4rewriters.LambdaRewriter.rewriteExpression(LambdaRewriter.java:105)
         *     at org.benf.cfr.reader.bytecode.analysis.parse.rewriters.ExpressionRewriterHelper.applyForwards(ExpressionRewriterHelper.java:12)
         *     at org.benf.cfr.reader.bytecode.analysis.parse.expression.AbstractMemberFunctionInvokation.applyExpressionRewriterToArgs(AbstractMemberFunctionInvokation.java:101)
         *     at org.benf.cfr.reader.bytecode.analysis.parse.expression.AbstractMemberFunctionInvokation.applyExpressionRewriter(AbstractMemberFunctionInvokation.java:88)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op4rewriters.LambdaRewriter.rewriteExpression(LambdaRewriter.java:103)
         *     at org.benf.cfr.reader.bytecode.analysis.parse.expression.AbstractMemberFunctionInvokation.applyExpressionRewriter(AbstractMemberFunctionInvokation.java:87)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op4rewriters.LambdaRewriter.rewriteExpression(LambdaRewriter.java:103)
         *     at org.benf.cfr.reader.bytecode.analysis.parse.expression.CastExpression.applyExpressionRewriter(CastExpression.java:128)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op4rewriters.LambdaRewriter.rewriteExpression(LambdaRewriter.java:103)
         *     at org.benf.cfr.reader.bytecode.analysis.structured.statement.StructuredReturn.rewriteExpressions(StructuredReturn.java:99)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op4rewriters.LambdaRewriter.rewrite(LambdaRewriter.java:88)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.rewriteLambdas(Op04StructuredStatement.java:1137)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:912)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @DataProvider
    public static Object[][] tableFormatWithDeleteFormat() {
        return (Object[][])Stream.of(StorageFormat.values()).flatMap(tableStorageFormat -> Arrays.stream(StorageFormat.values()).map(deleteFileStorageFormat -> new Object[]{tableStorageFormat, deleteFileStorageFormat})).toArray(x$0 -> new Object[x$0][]);
    }

    @DataProvider
    public static Object[][] storageFormatsWithSpecVersion() {
        List storageFormats = (List)Stream.of(StorageFormat.values()).collect(ImmutableList.toImmutableList());
        ImmutableList specVersions = ImmutableList.of((Object)1, (Object)2);
        return (Object[][])storageFormats.stream().flatMap(arg_0 -> TestIcebergSparkCompatibility.lambda$storageFormatsWithSpecVersion$36((List)specVersions, arg_0)).toArray(x$0 -> new Object[x$0][]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormats")
    public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_spark_reads_trino_partitioned_table_after_expiring_snapshots" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", new Object[]{trinoTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1003)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('b', 1004)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('b', 1005)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('b', 1006)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('c', 1007)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('c', 1008)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('c', 1009)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("DELETE FROM %s WHERE _string = '%s'", trinoTableName, Character.valueOf('b')), new QueryExecutor.QueryParam[0]);
        int initialNumberOfMetadataFiles = this.calculateMetadataFilesForPartitionedTable(baseTableName);
        QueryExecutors.onTrino().executeQuery("SET SESSION iceberg.expire_snapshots_min_retention = '0s'", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("SET SESSION iceberg.remove_orphan_files_min_retention = '0s'", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')", trinoTableName), new QueryExecutor.QueryParam[0]);
        int updatedNumberOfMetadataFiles = this.calculateMetadataFilesForPartitionedTable(baseTableName);
        Assertions.assertThat((int)updatedNumberOfMetadataFiles).isLessThan(initialNumberOfMetadataFiles);
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{3006});
        String selectByString = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'";
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format(selectByString, trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format(selectByString, sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsWithSpecVersion")
    public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_spark_reads_trino_partitioned_table_after_expiring_snapshots_after_optimize" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = %s)", new Object[]{trinoTableName, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1003)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('b', 1004)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('b', 1005)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('b', 1006)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('c', 1007)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('c', 1008)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('c', 1009)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("DELETE FROM %s WHERE _string = '%s'", trinoTableName, Character.valueOf('b')), new QueryExecutor.QueryParam[0]);
        int initialNumberOfFiles = QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM iceberg.default.\"%s$files\"", baseTableName), new QueryExecutor.QueryParam[0]).getRowsCount();
        int initialNumberOfMetadataFiles = this.calculateMetadataFilesForPartitionedTable(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("SET SESSION iceberg.expire_snapshots_min_retention = '0s'", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("SET SESSION iceberg.remove_orphan_files_min_retention = '0s'", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')", trinoTableName), new QueryExecutor.QueryParam[0]);
        int updatedNumberOfFiles = QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM iceberg.default.\"%s$files\"", baseTableName), new QueryExecutor.QueryParam[0]).getRowsCount();
        Assertions.assertThat((int)updatedNumberOfFiles).isLessThan(initialNumberOfFiles);
        int updatedNumberOfMetadataFiles = this.calculateMetadataFilesForPartitionedTable(baseTableName);
        Assertions.assertThat((int)updatedNumberOfMetadataFiles).isLessThan(initialNumberOfMetadataFiles);
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{3006});
        String selectByString = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'";
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format(selectByString, trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format(selectByString, sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="storageFormatsWithSpecVersion")
    public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(StorageFormat storageFormat, int specVersion) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = %s)", new Object[]{trinoTableName, storageFormat, specVersion}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("DELETE FROM %s WHERE _bigint = 1002", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1003)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES ('a', 1004)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("SET SESSION iceberg.expire_snapshots_min_retention = '0s'", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("SET SESSION iceberg.remove_orphan_files_min_retention = '0s'", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{3008});
        String selectByString = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'";
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format(selectByString, trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format(selectByString, sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"}, dataProvider="tableFormatWithDeleteFormat")
    public void testCleaningUpIcebergTableWithRowLevelDeletes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) {
        String baseTableName = TestIcebergSparkCompatibility.toLowerCase("test_cleaning_up_iceberg_table_fails_for_table_v2" + tableStorageFormat);
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(part_key INT, int_t INT, row_t STRUCT<a:INT, b:INT>) USING ICEBERG PARTITIONED BY (part_key) TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read','write.format.default'='" + tableStorageFormat.name() + "','write.delete.format.default'='" + deleteFileStorageFormat.name() + "')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 1, named_struct('a', 1, 'b', 2)), (1, 2, named_struct('a', 3, 'b', 4)), (1, 3, named_struct('a', 5, 'b', 6)), (2, 4, named_struct('a', 1, 'b', 2)), (2, 2, named_struct('a', 2, 'b', 3))", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("CALL iceberg_test.system.rewrite_data_files(table=>'default." + baseTableName + "', options => map('min-input-files','1'))", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("DELETE FROM " + sparkTableName + " WHERE int_t = 2", new QueryExecutor.QueryParam[0]);
        QueryAssert.Row row = QueryAssert.Row.row((Object[])new Object[]{4});
        String selectByString = "SELECT SUM(int_t) FROM %s WHERE part_key = 1";
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format(selectByString, trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format(selectByString, sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryExecutors.onTrino().executeQuery("SET SESSION iceberg.expire_snapshots_min_retention = '0s'", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("SET SESSION iceberg.remove_orphan_files_min_retention = '0s'", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("ALTER TABLE %s EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format(selectByString, trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format(selectByString, sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{row});
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testUpdateAfterSchemaEvolution() {
        String baseTableName = "test_update_after_schema_evolution_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(part_key INT, a INT, b INT, c INT) USING ICEBERG PARTITIONED BY (part_key) TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2, 3, 4), (11, 12, 13, 14)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD part_key", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD a", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP COLUMN b", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP COLUMN c", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD COLUMN c INT", new QueryExecutor.QueryParam[0]);
        ImmutableList expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{1, 2, null}), (Object)QueryAssert.Row.row((Object[])new Object[]{11, 12, null}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onTrino().executeQuery("UPDATE " + trinoTableName + " SET c = c + 1", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("UPDATE " + trinoTableName + " SET a = a + 1 WHERE c = 4", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        List filePaths = QueryExecutors.onTrino().executeQuery("SELECT DISTINCT file_path FROM iceberg.default.\"" + baseTableName + "$files\"", new QueryExecutor.QueryParam[0]).column(1);
        Assert.assertEquals((long)filePaths.stream().map(String::valueOf).filter(path -> path.contains("/a=") && !path.contains("/part_key=")).count(), (long)2L);
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testUpdateOnPartitionColumn() {
        String baseTableName = "test_update_on_partition_column" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(a INT, b STRING) USING ICEBERG PARTITIONED BY (a) TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 'first'), (1, 'second'), (2, 'third'), (2, 'forth'), (2, 'fifth')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("UPDATE " + trinoTableName + " SET a = a + 1", new QueryExecutor.QueryParam[0]);
        ImmutableList expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{2, "first"}), (Object)QueryAssert.Row.row((Object[])new Object[]{2, "second"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "third"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "forth"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "fifth"}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onTrino().executeQuery("UPDATE " + trinoTableName + " SET a = a + (CASE b WHEN 'first' THEN 1 ELSE 0 END)", new QueryExecutor.QueryParam[0]);
        expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{3, "first"}), (Object)QueryAssert.Row.row((Object[])new Object[]{2, "second"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "third"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "forth"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "fifth"}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onSpark().executeQuery("CALL iceberg_test.system.rewrite_data_files(table=>'default." + baseTableName + "', options => map('min-input-files','1'))", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("UPDATE " + trinoTableName + " SET a = a + (CASE b WHEN 'forth' THEN -1 ELSE 1 END)", new QueryExecutor.QueryParam[0]);
        expected = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{4, "first"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "second"}), (Object)QueryAssert.Row.row((Object[])new Object[]{4, "third"}), (Object)QueryAssert.Row.row((Object[])new Object[]{2, "forth"}), (Object)QueryAssert.Row.row((Object[])new Object[]{4, "fifth"}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expected);
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testAddNotNullColumn() {
        String baseTableName = "test_add_not_null_column_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + " AS SELECT 1 col", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " ADD COLUMN new_col INT NOT NULL", new QueryExecutor.QueryParam[0])).hasMessageMatching(".*This connector does not support adding not null columns");
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD COLUMN new_col INT NOT NULL", new QueryExecutor.QueryParam[0])).hasMessageMatching("(?s).*Unsupported table change: Incompatible change: cannot add required column.*");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testDropNestedField() {
        String baseTableName = "test_drop_nested_field_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + " AS SELECT CAST(row(1, 2, row(10, 20)) AS row(a integer, b integer, c row(x integer, y integer))) AS col", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " DROP COLUMN col.b", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a, col.c.x, col.c.y FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 10, 20})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a, col.c.x, col.c.y FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 10, 20})});
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " DROP COLUMN col.c", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testDropPastPartitionedField() {
        String baseTableName = "test_drop_past_partitioned_field_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(id INTEGER, parent ROW(nested VARCHAR, nested_another VARCHAR))", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD parent.nested", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " SET PROPERTIES partitioning = ARRAY[]", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " DROP COLUMN parent.nested", new QueryExecutor.QueryParam[0])).hasMessageContaining("Cannot drop column which is used by an old partition spec: parent.nested");
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests", "iceberg_rest", "iceberg_jdbc"})
    public void testHandlingPartitionSchemaEvolutionInPartitionMetadata() {
        String baseTableName = "test_handling_partition_schema_evolution_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (old_partition_key INT, new_partition_key INT, value date) WITH (PARTITIONING = array['old_partition_key'])", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES (1, 10, date '2022-04-10'), (2, 20, date '2022-05-11'), (3, 30, date '2022-06-12'), (2, 20, date '2022-06-13')", trinoTableName), new QueryExecutor.QueryParam[0]);
        this.validatePartitioning(baseTableName, sparkTableName, (List<Map<String, String>>)ImmutableList.of((Object)ImmutableMap.of((Object)"old_partition_key", (Object)"1"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"2"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"3")));
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s DROP PARTITION FIELD old_partition_key", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s ADD PARTITION FIELD new_partition_key", sparkTableName), new QueryExecutor.QueryParam[0]);
        this.validatePartitioning(baseTableName, sparkTableName, (List<Map<String, String>>)ImmutableList.of((Object)ImmutableMap.of((Object)"old_partition_key", (Object)"1", (Object)"new_partition_key", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"2", (Object)"new_partition_key", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"3", (Object)"new_partition_key", (Object)"null")));
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES (4, 40, date '2022-08-15')", trinoTableName), new QueryExecutor.QueryParam[0]);
        this.validatePartitioning(baseTableName, sparkTableName, (List<Map<String, String>>)ImmutableList.of((Object)ImmutableMap.of((Object)"old_partition_key", (Object)"1", (Object)"new_partition_key", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"2", (Object)"new_partition_key", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"40"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"3", (Object)"new_partition_key", (Object)"null")));
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s DROP PARTITION FIELD new_partition_key", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s ADD PARTITION FIELD old_partition_key", sparkTableName), new QueryExecutor.QueryParam[0]);
        this.validatePartitioning(baseTableName, sparkTableName, (List<Map<String, String>>)ImmutableList.of((Object)ImmutableMap.of((Object)"old_partition_key", (Object)"1", (Object)"new_partition_key", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"2", (Object)"new_partition_key", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"40"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"3", (Object)"new_partition_key", (Object)"null")));
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES (5, 50, date '2022-08-15')", trinoTableName), new QueryExecutor.QueryParam[0]);
        this.validatePartitioning(baseTableName, sparkTableName, (List<Map<String, String>>)ImmutableList.of((Object)ImmutableMap.of((Object)"old_partition_key", (Object)"1", (Object)"new_partition_key", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"2", (Object)"new_partition_key", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"40"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"5", (Object)"new_partition_key", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"3", (Object)"new_partition_key", (Object)"null")));
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s DROP PARTITION FIELD old_partition_key", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s ADD PARTITION FIELD days(value)", sparkTableName), new QueryExecutor.QueryParam[0]);
        this.validatePartitioning(baseTableName, sparkTableName, (List<Map<String, String>>)ImmutableList.of((Object)ImmutableMap.of((Object)"old_partition_key", (Object)"1", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"2", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"40", (Object)"value_day", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"5", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"3", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null")));
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES (6, 60, date '2022-08-16')", trinoTableName), new QueryExecutor.QueryParam[0]);
        this.validatePartitioning(baseTableName, sparkTableName, (List<Map<String, String>>)ImmutableList.of((Object)ImmutableMap.of((Object)"old_partition_key", (Object)"1", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"2", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"40", (Object)"value_day", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"2022-08-16"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"5", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"3", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null")));
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s DROP PARTITION FIELD value_day", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("ALTER TABLE %s ADD PARTITION FIELD months(value)", sparkTableName), new QueryExecutor.QueryParam[0]);
        this.validatePartitioning(baseTableName, sparkTableName, (List<Map<String, String>>)ImmutableList.of((Object)ImmutableMap.of((Object)"old_partition_key", (Object)"1", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"2", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"40", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"2022-08-16", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"5", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"3", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null")));
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES (7, 70, date '2022-08-17')", trinoTableName), new QueryExecutor.QueryParam[0]);
        this.validatePartitioning(baseTableName, sparkTableName, (List<Map<String, String>>)ImmutableList.of((Object)ImmutableMap.of((Object)"old_partition_key", (Object)"1", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"631"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"2", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"40", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"null", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"2022-08-16", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"5", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null"), (Object)ImmutableMap.of((Object)"old_partition_key", (Object)"3", (Object)"new_partition_key", (Object)"null", (Object)"value_day", (Object)"null", (Object)"value_month", (Object)"null")));
    }

    @Test(groups={"iceberg", "profile_specific_tests"})
    public void testMetadataCompressionCodecGzip() {
        String baseTableName = "test_metadata_compression_codec_gzip" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(col int) USING iceberg TBLPROPERTIES ('write.metadata.compression-codec'='gzip')", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (2)", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1}), QueryAssert.Row.row((Object[])new Object[]{2})});
        String tableLocation = IcebergTestUtils.stripNamenodeURI(IcebergTestUtils.getTableLocation(trinoTableName));
        List metadataFiles = (List)this.hdfsClient.listDirectory(tableLocation + "/metadata").stream().filter(file -> file.endsWith("metadata.json")).collect(ImmutableList.toImmutableList());
        ((ListAssert)((ListAssert)Assertions.assertThat((List)metadataFiles).isNotEmpty()).filteredOn(file -> file.endsWith("gz.metadata.json"))).isEqualTo((Object)metadataFiles);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " SET TBLPROPERTIES ('write.metadata.compression-codec'='none')", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1}), QueryAssert.Row.row((Object[])new Object[]{2})});
        QueryExecutors.onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (3)", new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1}), QueryAssert.Row.row((Object[])new Object[]{2}), QueryAssert.Row.row((Object[])new Object[]{3})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    private void validatePartitioning(String baseTableName, String sparkTableName, List<Map<String, String>> expectedValues) {
        List trinoResult = (List)expectedValues.stream().map(m -> m.entrySet().stream().map(entry -> String.format("%s=%s", entry.getKey(), entry.getValue())).collect(Collectors.joining(", ", "{", "}"))).collect(ImmutableList.toImmutableList());
        List partitioning = QueryExecutors.onTrino().executeQuery(String.format("SELECT partition, record_count FROM iceberg.default.\"%s$partitions\"", baseTableName), new QueryExecutor.QueryParam[0]).column(1);
        Set partitions = partitioning.stream().map(String::valueOf).collect(Collectors.toUnmodifiableSet());
        Assertions.assertThat((int)partitions.size()).isEqualTo(expectedValues.size());
        Assertions.assertThat(partitions).containsAll((Iterable)trinoResult);
        List sparkResult = (List)expectedValues.stream().map(m -> m.entrySet().stream().map(entry -> String.format("\"%s\":%s", entry.getKey(), entry.getValue())).collect(Collectors.joining(",", "{", "}"))).collect(ImmutableList.toImmutableList());
        partitioning = QueryExecutors.onSpark().executeQuery(String.format("SELECT partition from %s.files", sparkTableName), new QueryExecutor.QueryParam[0]).column(1);
        partitions = partitioning.stream().map(String::valueOf).collect(Collectors.toUnmodifiableSet());
        Assertions.assertThat((int)partitions.size()).isEqualTo(expectedValues.size());
        Assertions.assertThat(partitions).containsAll((Iterable)sparkResult);
    }

    @Test(groups={"iceberg", "profile_specific_tests"})
    public void testTrinoAnalyze() {
        String baseTableName = "test_trino_analyze_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + " AS SELECT regionkey, name FROM tpch.tiny.region", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("ANALYZE " + trinoTableName, new QueryExecutor.QueryParam[0]);
        List<QueryAssert.Row> expected = List.of(QueryAssert.Row.row((Object[])new Object[]{0, "AFRICA"}), QueryAssert.Row.row((Object[])new Object[]{1, "AMERICA"}), QueryAssert.Row.row((Object[])new Object[]{2, "ASIA"}), QueryAssert.Row.row((Object[])new Object[]{3, "EUROPE"}), QueryAssert.Row.row((Object[])new Object[]{4, "MIDDLE EAST"}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"})
    public void testTrinoAnalyzeWithNonLowercaseColumnName() {
        String baseTableName = "test_trino_analyze_with_uppercase_field" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(col1 INT, COL2 INT) USING ICEBERG", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 1)", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("ANALYZE " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 1})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 1})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testRegisterTableWithTableLocation(StorageFormat storageFormat) throws TException {
        String baseTableName = "test_register_table_with_table_location_" + storageFormat.name().toLowerCase(Locale.ENGLISH) + "_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", new Object[]{sparkTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s values(2, 'USA', false)", trinoTableName), new QueryExecutor.QueryParam[0]);
        List<QueryAssert.Row> expected = List.of(QueryAssert.Row.row((Object[])new Object[]{1, "INDIA", true}), QueryAssert.Row.row((Object[])new Object[]{2, "USA", false}));
        String tableLocation = IcebergTestUtils.getTableLocation(trinoTableName);
        this.dropTableFromMetastore(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format("SELECT * FROM %s", sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryExecutors.onTrino().executeQuery(String.format("DROP TABLE %s", trinoTableName), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testRegisterTableWithComments(StorageFormat storageFormat) throws TException {
        String baseTableName = "test_register_table_with_comments_" + storageFormat.name().toLowerCase(Locale.ENGLISH) + "_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", new Object[]{trinoTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s values(2, 'USA', false)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("COMMENT ON TABLE %s is 'my-table-comment'", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("COMMENT ON COLUMN %s.a is 'a-comment'", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("COMMENT ON COLUMN %s.b is 'b-comment'", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("COMMENT ON COLUMN %s.c is 'c-comment'", trinoTableName), new QueryExecutor.QueryParam[0]);
        String tableLocation = IcebergTestUtils.getTableLocation(trinoTableName);
        this.dropTableFromMetastore(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation), new QueryExecutor.QueryParam[0]);
        Assertions.assertThat((String)this.getTableComment(baseTableName)).isEqualTo("my-table-comment");
        Assertions.assertThat((String)this.getColumnComment(baseTableName, "a")).isEqualTo("a-comment");
        Assertions.assertThat((String)this.getColumnComment(baseTableName, "b")).isEqualTo("b-comment");
        Assertions.assertThat((String)this.getColumnComment(baseTableName, "c")).isEqualTo("c-comment");
        QueryExecutors.onTrino().executeQuery(String.format("DROP TABLE %s", trinoTableName), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testRegisterTableWithShowCreateTable(StorageFormat storageFormat) throws TException {
        String baseTableName = "test_register_table_with_show_create_table_" + storageFormat.name().toLowerCase(Locale.ENGLISH) + "_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", new Object[]{sparkTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s values(2, 'USA', false)", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryResult expectedDescribeTable = QueryExecutors.onSpark().executeQuery("DESCRIBE TABLE EXTENDED " + sparkTableName, new QueryExecutor.QueryParam[0]);
        List expectedDescribeTableRows = (List)expectedDescribeTable.rows().stream().map(columns -> QueryAssert.Row.row((Object[])columns.toArray())).collect(ImmutableList.toImmutableList());
        QueryResult expectedShowCreateTable = QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
        List expectedShowCreateTableRows = (List)expectedShowCreateTable.rows().stream().map(columns -> QueryAssert.Row.row((Object[])columns.toArray())).collect(ImmutableList.toImmutableList());
        String tableLocation = IcebergTestUtils.getTableLocation(trinoTableName);
        this.dropTableFromMetastore(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation), new QueryExecutor.QueryParam[0]);
        QueryResult actualDescribeTable = QueryExecutors.onSpark().executeQuery("DESCRIBE TABLE EXTENDED " + sparkTableName, new QueryExecutor.QueryParam[0]);
        QueryResult actualShowCreateTable = QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
        QueryAssert.assertThat((QueryResult)actualDescribeTable).hasColumns(expectedDescribeTable.getColumnTypes()).containsExactlyInOrder(expectedDescribeTableRows);
        QueryAssert.assertThat((QueryResult)actualShowCreateTable).hasColumns(expectedShowCreateTable.getColumnTypes()).containsExactlyInOrder(expectedShowCreateTableRows);
        QueryExecutors.onTrino().executeQuery(String.format("DROP TABLE %s", trinoTableName), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testRegisterTableWithReInsert(StorageFormat storageFormat) throws TException {
        String baseTableName = "test_register_table_with_re_insert_" + storageFormat.name().toLowerCase(Locale.ENGLISH) + "_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", new Object[]{trinoTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s values(2, 'USA', false)", trinoTableName), new QueryExecutor.QueryParam[0]);
        String tableLocation = IcebergTestUtils.getTableLocation(trinoTableName);
        this.dropTableFromMetastore(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s values(3, 'POLAND', true)", sparkTableName), new QueryExecutor.QueryParam[0]);
        List<QueryAssert.Row> expected = List.of(QueryAssert.Row.row((Object[])new Object[]{1, "INDIA", true}), QueryAssert.Row.row((Object[])new Object[]{2, "USA", false}), QueryAssert.Row.row((Object[])new Object[]{3, "POLAND", true}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format("SELECT * FROM %s", sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryExecutors.onTrino().executeQuery(String.format("DROP TABLE %s", trinoTableName), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testRegisterTableWithDroppedTable(StorageFormat storageFormat) {
        String baseTableName = "test_register_table_with_dropped_table_" + storageFormat.name().toLowerCase(Locale.ENGLISH) + "_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", new Object[]{sparkTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s values(2, 'USA', false)", trinoTableName), new QueryExecutor.QueryParam[0]);
        String tableLocation = IcebergTestUtils.getTableLocation(trinoTableName);
        String baseTableNameNew = baseTableName + "_new";
        QueryExecutors.onTrino().executeQuery(String.format("DROP TABLE %s", trinoTableName), new QueryExecutor.QueryParam[0]);
        QueryAssert.assertQueryFailure(() -> QueryExecutors.onTrino().executeQuery(String.format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableNameNew, tableLocation), new QueryExecutor.QueryParam[0])).hasMessageMatching(".*No versioned metadata file exists at location.*");
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testRegisterTableWithDifferentTableName(StorageFormat storageFormat) throws TException {
        String baseTableName = "test_register_table_with_different_table_name_" + storageFormat.name().toLowerCase(Locale.ENGLISH) + "_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", new Object[]{sparkTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s values(2, 'USA', false)", trinoTableName), new QueryExecutor.QueryParam[0]);
        String tableLocation = IcebergTestUtils.getTableLocation(trinoTableName);
        String baseTableNameNew = baseTableName + "_new";
        String trinoTableNameNew = TestIcebergSparkCompatibility.trinoTableName(baseTableNameNew);
        String sparkTableNameNew = TestIcebergSparkCompatibility.sparkTableName(baseTableNameNew);
        this.dropTableFromMetastore(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableNameNew, tableLocation), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s values(3, 'POLAND', true)", sparkTableNameNew), new QueryExecutor.QueryParam[0]);
        List<QueryAssert.Row> expected = List.of(QueryAssert.Row.row((Object[])new Object[]{1, "INDIA", true}), QueryAssert.Row.row((Object[])new Object[]{2, "USA", false}), QueryAssert.Row.row((Object[])new Object[]{3, "POLAND", true}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM %s", trinoTableNameNew), new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format("SELECT * FROM %s", sparkTableNameNew), new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryExecutors.onTrino().executeQuery(String.format("DROP TABLE %s", trinoTableNameNew), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testRegisterTableWithMetadataFile(StorageFormat storageFormat) throws TException {
        String baseTableName = "test_register_table_with_metadata_file_" + storageFormat.name().toLowerCase(Locale.ENGLISH) + "_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery(String.format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", new Object[]{sparkTableName, storageFormat}), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery(String.format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s values(2, 'USA', false)", trinoTableName), new QueryExecutor.QueryParam[0]);
        String tableLocation = IcebergTestUtils.getTableLocation(trinoTableName);
        String metadataLocation = (String)this.metastoreClient.getTable(TEST_SCHEMA_NAME, baseTableName).getParameters().get("metadata_location");
        String metadataFileName = metadataLocation.substring(metadataLocation.lastIndexOf("/") + 1);
        this.dropTableFromMetastore(baseTableName);
        QueryExecutors.onTrino().executeQuery(String.format("CALL iceberg.system.register_table ('%s', '%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation, metadataFileName), new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s values(3, 'POLAND', true)", trinoTableName), new QueryExecutor.QueryParam[0]);
        List<QueryAssert.Row> expected = List.of(QueryAssert.Row.row((Object[])new Object[]{1, "INDIA", true}), QueryAssert.Row.row((Object[])new Object[]{2, "USA", false}), QueryAssert.Row.row((Object[])new Object[]{3, "POLAND", true}));
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("SELECT * FROM %s", trinoTableName), new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery(String.format("SELECT * FROM %s", sparkTableName), new QueryExecutor.QueryParam[0])).containsOnly(expected);
        QueryExecutors.onTrino().executeQuery(String.format("DROP TABLE %s", trinoTableName), new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"})
    public void testUnregisterNotIcebergTable() {
        String baseTableName = "test_unregister_not_iceberg_table_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String hiveTableName = "default." + baseTableName;
        QueryExecutors.onHive().executeQuery("CREATE TABLE " + hiveTableName + " AS SELECT 1 a", new QueryExecutor.QueryParam[0]);
        Assertions.assertThatThrownBy(() -> QueryExecutors.onTrino().executeQuery("CALL iceberg.system.unregister_table('default', '" + baseTableName + "')", new QueryExecutor.QueryParam[0])).hasMessageContaining("Not an Iceberg table");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + hiveTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM hive.default." + baseTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1})});
        Assertions.assertThatThrownBy(() -> QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).hasMessageContaining("Not an Iceberg table");
        QueryExecutors.onHive().executeQuery("DROP TABLE " + hiveTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="testSetColumnTypeDataProvider")
    public void testTrinoSetColumnType(StorageFormat storageFormat, String sourceColumnType, String sourceValueLiteral, String newColumnType, Object newValue) {
        this.testTrinoSetColumnType(false, storageFormat, sourceColumnType, sourceValueLiteral, newColumnType, newValue);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="testSetColumnTypeDataProvider")
    public void testTrinoSetPartitionedColumnType(StorageFormat storageFormat, String sourceColumnType, String sourceValueLiteral, String newColumnType, Object newValue) {
        this.testTrinoSetColumnType(true, storageFormat, sourceColumnType, sourceValueLiteral, newColumnType, newValue);
    }

    private void testTrinoSetColumnType(boolean partitioned, StorageFormat storageFormat, String sourceColumnType, String sourceValueLiteral, String newColumnType, Object newValue) {
        String baseTableName = "test_set_column_type_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + " WITH (format = '" + storageFormat + "'" + (partitioned ? ", partitioning = ARRAY['col']" : "") + ")AS SELECT CAST(" + sourceValueLiteral + " AS " + sourceColumnType + ") AS col", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " ALTER COLUMN col SET DATA TYPE " + newColumnType, new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)newColumnType);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{newValue})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{newValue})});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @DataProvider
    public static Object[][] testSetColumnTypeDataProvider() {
        return DataProviders.cartesianProduct((Object[][][])new Object[][][]{(Object[][])Stream.of(StorageFormat.values()).collect(DataProviders.toDataProvider()), {{"integer", "2147483647", "bigint", Integer.MAX_VALUE}, {"real", "10.3", "double", 10.3}, {"real", "'NaN'", "double", Double.NaN}, {"decimal(5,3)", "'12.345'", "decimal(10,3)", BigDecimal.valueOf(12.345)}}});
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testTrinoAlterStructColumnType(StorageFormat storageFormat) {
        String baseTableName = "test_trino_alter_row_column_type_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onTrino().executeQuery("CREATE TABLE " + trinoTableName + " WITH (format = '" + storageFormat + "')AS SELECT CAST(row(1, 2) AS row(a integer, b integer)) AS col", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " ALTER COLUMN col SET DATA TYPE row(a integer, b integer, c integer)", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(a integer, b integer, c integer)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a, col.b, col.c FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a, col.b, col.c FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2, null})});
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " ALTER COLUMN col SET DATA TYPE row(a integer, b bigint, c integer)", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(a integer, b bigint, c integer)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a, col.b, col.c FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a, col.b, col.c FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2, null})});
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " ALTER COLUMN col SET DATA TYPE row(a integer, c integer)", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(a integer, c integer)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a, col.c FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a, col.c FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, null})});
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " ALTER COLUMN col SET DATA TYPE row(a integer, c integer, b bigint)", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(a integer, c integer, b bigint)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a, col.c, col.b FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, null, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a, col.c, col.b FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, null, null})});
        QueryExecutors.onTrino().executeQuery("ALTER TABLE " + trinoTableName + " ALTER COLUMN col SET DATA TYPE row(c integer, b bigint, a integer)", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(c integer, b bigint, a integer)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.b, col.c, col.a FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null, null, 1})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.b, col.c, col.a FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null, null, 1})});
        QueryExecutors.onTrino().executeQuery("DROP TABLE " + trinoTableName, new QueryExecutor.QueryParam[0]);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="testSparkAlterColumnType")
    public void testSparkAlterColumnType(StorageFormat storageFormat, String sourceColumnType, String sourceValueLiteral, String newColumnType, Object newValue) {
        this.testSparkAlterColumnType(false, storageFormat, sourceColumnType, sourceValueLiteral, newColumnType, newValue);
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="testSparkAlterColumnType")
    public void testSparkAlterPartitionedColumnType(StorageFormat storageFormat, String sourceColumnType, String sourceValueLiteral, String newColumnType, Object newValue) {
        this.testSparkAlterColumnType(true, storageFormat, sourceColumnType, sourceValueLiteral, newColumnType, newValue);
    }

    private void testSparkAlterColumnType(boolean partitioned, StorageFormat storageFormat, String sourceColumnType, String sourceValueLiteral, String newColumnType, Object newValue) {
        String baseTableName = "test_spark_alter_column_type_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + (partitioned ? " PARTITIONED BY (col)" : "") + " TBLPROPERTIES ('write.format.default' = '" + storageFormat + "')AS SELECT CAST(" + sourceValueLiteral + " AS " + sourceColumnType + ") AS col", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ALTER COLUMN col TYPE " + newColumnType, new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)newColumnType);
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT * FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{newValue})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{newValue})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    @DataProvider
    public static Object[][] testSparkAlterColumnType() {
        return DataProviders.cartesianProduct((Object[][][])new Object[][][]{(Object[][])Stream.of(StorageFormat.values()).collect(DataProviders.toDataProvider()), {{"integer", "2147483647", "bigint", Integer.MAX_VALUE}, {"float", "10.3", "double", 10.3}, {"float", "'NaN'", "double", Double.NaN}, {"decimal(5,3)", "'12.345'", "decimal(10,3)", BigDecimal.valueOf(12.345)}}});
    }

    @Test(groups={"iceberg", "profile_specific_tests"}, dataProvider="storageFormats")
    public void testSparkAlterStructColumnType(StorageFormat storageFormat) {
        String baseTableName = "test_spark_alter_struct_column_type_" + TestingNames.randomNameSuffix();
        String trinoTableName = TestIcebergSparkCompatibility.trinoTableName(baseTableName);
        String sparkTableName = TestIcebergSparkCompatibility.sparkTableName(baseTableName);
        QueryExecutors.onSpark().executeQuery("CREATE TABLE " + sparkTableName + " TBLPROPERTIES ('write.format.default' = '" + storageFormat + "')AS SELECT named_struct('a', 1, 'b', 2) AS col", new QueryExecutor.QueryParam[0]);
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD COLUMN col.c integer", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(a integer, b integer, c integer)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a, col.b, col.c FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a, col.b, col.c FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2, null})});
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ALTER COLUMN col.b TYPE bigint", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(a integer, b bigint, c integer)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a, col.b, col.c FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a, col.b, col.c FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, 2, null})});
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP COLUMN col.b", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(a integer, c integer)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a, col.c FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a, col.c FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, null})});
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD COLUMN col.b bigint", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(a integer, c integer, b bigint)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.a, col.c, col.b FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, null, null})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.a, col.c, col.b FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, null, null})});
        QueryExecutors.onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ALTER COLUMN col.a AFTER b", new QueryExecutor.QueryParam[0]);
        Assert.assertEquals((String)this.getColumnType(baseTableName, "col"), (String)"row(c integer, b bigint, a integer)");
        QueryAssert.assertThat((QueryResult)QueryExecutors.onSpark().executeQuery("SELECT col.b, col.c, col.a FROM " + sparkTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null, null, 1})});
        QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT col.b, col.c, col.a FROM " + trinoTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{null, null, 1})});
        QueryExecutors.onSpark().executeQuery("DROP TABLE " + sparkTableName, new QueryExecutor.QueryParam[0]);
    }

    private String getColumnType(String tableName, String columnName) {
        return (String)QueryExecutors.onTrino().executeQuery("SELECT data_type FROM iceberg.information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'", new QueryExecutor.QueryParam[0]).getOnlyValue();
    }

    private int calculateMetadataFilesForPartitionedTable(String tableName) {
        String dataFilePath = (String)QueryExecutors.onTrino().executeQuery(String.format("SELECT file_path FROM iceberg.default.\"%s$files\" limit 1", tableName), new QueryExecutor.QueryParam[0]).getOnlyValue();
        String partitionPath = dataFilePath.substring(0, dataFilePath.lastIndexOf("/"));
        String dataFolderPath = partitionPath.substring(0, partitionPath.lastIndexOf("/"));
        String tableFolderPath = dataFolderPath.substring(0, dataFolderPath.lastIndexOf("/"));
        String metadataFolderPath = tableFolderPath + "/metadata";
        return this.hdfsClient.listDirectory(URI.create(metadataFolderPath).getPath()).size();
    }

    private void dropTableFromMetastore(String tableName) throws TException {
        this.metastoreClient.dropTable(TEST_SCHEMA_NAME, tableName, false);
        Assertions.assertThatThrownBy(() -> this.metastoreClient.getTable(TEST_SCHEMA_NAME, tableName)).hasMessageContaining("table not found");
    }

    private String getTableComment(String tableName) {
        return (String)QueryExecutors.onTrino().executeQuery("SELECT comment FROM system.metadata.table_comments WHERE catalog_name = 'iceberg' AND schema_name = 'default' AND table_name = '" + tableName + "'", new QueryExecutor.QueryParam[0]).getOnlyValue();
    }

    private String getColumnComment(String tableName, String columnName) {
        return (String)QueryExecutors.onTrino().executeQuery("SELECT comment FROM iceberg.information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'", new QueryExecutor.QueryParam[0]).getOnlyValue();
    }

    private static String toLowerCase(String name) {
        return name.toLowerCase(Locale.ENGLISH);
    }

    private static /* synthetic */ Stream lambda$storageFormatsWithSpecVersion$36(List specVersions, StorageFormat storageFormat) {
        return specVersions.stream().map(specVersion -> new Object[]{storageFormat, specVersion});
    }

    private static /* synthetic */ Object[][] lambda$storageFormats$31(int x$0) {
        return new Object[x$0][];
    }

    public static enum StorageFormat {
        PARQUET,
        ORC,
        AVRO;

    }

    public static enum CreateMode {
        CREATE_TABLE_AND_INSERT,
        CREATE_TABLE_AS_SELECT,
        CREATE_TABLE_WITH_NO_DATA_AND_INSERT;

    }
}

