/*
 * Decompiled with CFR 0.152.
 */
package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.BeforeTestWithContext;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.query.QueryExecutor;
import io.trino.tempto.query.QueryResult;
import io.trino.testing.TestingNames;
import io.trino.testng.services.Flaky;
import io.trino.tests.product.deltalake.BaseTestDeltaLakeS3Storage;
import io.trino.tests.product.deltalake.S3ClientFactory;
import io.trino.tests.product.deltalake.TransactionLogAssertions;
import io.trino.tests.product.deltalake.util.DatabricksVersion;
import io.trino.tests.product.deltalake.util.DeltaLakeTestUtils;
import io.trino.tests.product.utils.QueryExecutors;
import java.util.List;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class TestDeltaLakeDatabricksCheckpointsCompatibility
extends BaseTestDeltaLakeS3Storage {
    @Inject
    @Named(value="s3.server_type")
    private String s3ServerType;
    private AmazonS3 s3;
    private DatabricksVersion databricksRuntimeVersion;

    @BeforeTestWithContext
    public void setup() {
        super.setUp();
        this.s3 = new S3ClientFactory().createS3Client(this.s3ServerType);
        this.databricksRuntimeVersion = DeltaLakeTestUtils.getDatabricksRuntimeVersion().orElseThrow();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"delta-lake-databricks", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testDatabricksCanReadTrinoCheckpoint() {
        String tableName = "test_dl_checkpoints_compat_" + TestingNames.randomNameSuffix();
        String tableDirectory = "databricks-compatibility-test-" + tableName;
        QueryExecutors.onDelta().executeQuery(String.format("CREATE TABLE default.%s      (a_NuMbEr INT, a_StRiNg STRING)      USING delta      PARTITIONED BY (a_NuMbEr)      LOCATION 's3://%s/%s'", tableName, this.bucketName, tableDirectory), new QueryExecutor.QueryParam[0]);
        try {
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,'ala'), (2, 'kota')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3, 'osla')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, 'psa'), (4, 'bobra')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (4, 'lwa'), (5, 'jeza')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a_string = 'jeza'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_string = 'bobra'", new QueryExecutor.QueryParam[0]);
            ImmutableList expectedRows = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{1, "ala"}), (Object)QueryAssert.Row.row((Object[])new Object[]{2, "kota"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "osla"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "psa"}), (Object)QueryAssert.Row.row((Object[])new Object[]{4, "lwa"}));
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, tableDirectory)).hasSize(0);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT * FROM default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT * FROM default." + tableName + " WHERE a_string <> 'fill'", new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM delta.default." + tableName + " WHERE a_string <> 'fill'", new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'fill')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'fill')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'fill')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'fill')", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, tableDirectory)).hasSize(1);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT * FROM default." + tableName + " WHERE a_string <> 'fill'", new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM delta.default." + tableName + " WHERE a_string <> 'fill'", new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"delta-lake-databricks", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testTrinoUsesCheckpointInterval() {
        String tableName = "test_dl_checkpoints_compat_" + TestingNames.randomNameSuffix();
        String tableDirectory = "databricks-compatibility-test-" + tableName;
        QueryExecutors.onDelta().executeQuery(String.format("CREATE TABLE default.%s      (a_NuMbEr INT, a_StRiNg STRING)      USING delta      PARTITIONED BY (a_NuMbEr)      LOCATION 's3://%s/%s'      TBLPROPERTIES ('delta.checkpointInterval' = '5')", tableName, this.bucketName, tableDirectory), new QueryExecutor.QueryParam[0]);
        try {
            String showCreateTable = String.format("CREATE TABLE delta.default.%s (\n   a_number integer,\n   a_string varchar\n)\nWITH (\n   checkpoint_interval = 5,\n   location = 's3://%s/%s',\n   partitioned_by = ARRAY['a_number']\n)", tableName, this.bucketName, tableDirectory);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{showCreateTable})});
            this.fillWithInserts("delta.default." + tableName, "(1, 'trino')", 4);
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, tableDirectory)).hasSize(0);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM delta.default." + tableName + " WHERE a_string <> 'trino'", new QueryExecutor.QueryParam[0])).hasNoRows();
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'ala'), (2, 'kota')", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, tableDirectory)).hasSize(1);
            this.fillWithInserts("delta.default." + tableName, "(2, 'trino')", 3);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3, 'psa'), (4, 'bobra')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a_string = 'trino'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES ('delta.checkpointInterval' = '2')", new QueryExecutor.QueryParam[0]);
            int initialCheckpointCount = this.listCheckpointFiles(this.bucketName, tableDirectory).size();
            this.fillWithInserts("delta.default." + tableName, "(3, 'trino')", 4);
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, tableDirectory)).hasSize(initialCheckpointCount + 2);
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"delta-lake-databricks", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testDatabricksUsesCheckpointInterval() {
        String tableName = "test_dl_checkpoints_compat_" + TestingNames.randomNameSuffix();
        String tableDirectory = "databricks-compatibility-test-" + tableName;
        QueryExecutors.onTrino().executeQuery(String.format("CREATE TABLE delta.default.%s (a_number bigint, a_string varchar) WITH (      location = 's3://%s/%s',      partitioned_by = ARRAY['a_number'],      checkpoint_interval = 3)", tableName, this.bucketName, tableDirectory), new QueryExecutor.QueryParam[0]);
        try {
            String showCreateTable = this.databricksRuntimeVersion.isAtLeast(DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION) ? String.format("CREATE TABLE spark_catalog.default.%s (\n  a_number BIGINT,\n  a_string STRING)\nUSING delta\nPARTITIONED BY (a_number)\nLOCATION 's3://%s/%s'\n%s", tableName, this.bucketName, tableDirectory, this.getDatabricksTablePropertiesWithCheckpointInterval()) : String.format("CREATE TABLE `default`.`%s` (\n  `a_number` BIGINT,\n  `a_string` STRING)\nUSING DELTA\nPARTITIONED BY (a_number)\nLOCATION 's3://%s/%s'\nTBLPROPERTIES (\n  'delta.checkpointInterval' = '3')\n", tableName, this.bucketName, tableDirectory);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SHOW CREATE TABLE default." + tableName, new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{showCreateTable})});
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 'databricks')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (2, 'databricks')", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, tableDirectory)).hasSize(0);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM delta.default." + tableName + " WHERE a_string <> 'databricks'", new QueryExecutor.QueryParam[0])).hasNoRows();
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3, 'databricks')", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, tableDirectory)).hasSize(1);
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'ala'), (2, 'kota')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3, 'psa'), (4, 'bobra')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (5, 'osla'), (6, 'lwa')", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, tableDirectory)).hasSize(2);
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    private String getDatabricksTablePropertiesWithCheckpointInterval() {
        if (this.databricksRuntimeVersion.equals(DatabricksVersion.DATABRICKS_113_RUNTIME_VERSION)) {
            return "TBLPROPERTIES (\n  'delta.checkpointInterval' = '3',\n  'delta.minReaderVersion' = '1',\n  'delta.minWriterVersion' = '2')\n";
        }
        if (this.databricksRuntimeVersion.equals(DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION)) {
            return "TBLPROPERTIES (\n  'Type' = 'EXTERNAL',\n  'delta.checkpointInterval' = '3',\n  'delta.minReaderVersion' = '1',\n  'delta.minWriterVersion' = '2')\n";
        }
        throw new IllegalArgumentException("Unsupported databricks runtime version: " + this.databricksRuntimeVersion);
    }

    @Test(groups={"delta-lake-databricks", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testTrinoCheckpointMinMaxStatisticsForRowType() {
        String tableName = "test_dl_checkpoints_row_compat_min_max_trino_" + TestingNames.randomNameSuffix();
        this.testCheckpointMinMaxStatisticsForRowType(sql -> QueryExecutors.onTrino().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "delta.default." + tableName);
    }

    @Test(groups={"delta-lake-databricks", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testDatabricksCheckpointMinMaxStatisticsForRowType() {
        String tableName = "test_dl_checkpoints_row_compat_min_max_databricks_" + TestingNames.randomNameSuffix();
        this.testCheckpointMinMaxStatisticsForRowType(sql -> QueryExecutors.onDelta().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "default." + tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCheckpointMinMaxStatisticsForRowType(Consumer<String> sqlExecutor, String tableName, String qualifiedTableName) {
        ImmutableList expectedRows = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{1, "ala"}), (Object)QueryAssert.Row.row((Object[])new Object[]{2, "kota"}), (Object)QueryAssert.Row.row((Object[])new Object[]{3, "osla"}), (Object)QueryAssert.Row.row((Object[])new Object[]{4, "zulu"}));
        QueryExecutors.onDelta().executeQuery(String.format("CREATE TABLE default.%s      (id INT, root STRUCT<entry_one : INT, entry_two : STRING>)      USING DELTA       LOCATION 's3://%s/databricks-compatibility-test-%1$s'       TBLPROPERTIES (delta.checkpointInterval = 1)", tableName, this.bucketName), new QueryExecutor.QueryParam[0]);
        try {
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, STRUCT(1,'ala')), (2, STRUCT(2, 'kota'))", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3, STRUCT(3, 'osla'))", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (4, STRUCT(4, 'zulu'))", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, "databricks-compatibility-test-" + tableName)).hasSize(3);
            TransactionLogAssertions.assertTransactionLogVersion(this.s3, this.bucketName, tableName, 3);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT DISTINCT root.entry_one, root.entry_two FROM default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT DISTINCT root.entry_one, root.entry_two FROM delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
            sqlExecutor.accept("DELETE FROM " + qualifiedTableName + " WHERE id = 4");
            TransactionLogAssertions.assertLastEntryIsCheckpointed(this.s3, this.bucketName, tableName);
            String explainSelectMax = (String)Iterables.getOnlyElement((Iterable)QueryExecutors.onDelta().executeQuery("EXPLAIN SELECT max(root.entry_one) FROM default." + tableName, new QueryExecutor.QueryParam[0]).column(1));
            String column = this.databricksRuntimeVersion.isAtLeast(DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_one" : "root.entry_one AS `entry_one`";
            Assertions.assertThat((String)explainSelectMax).matches((CharSequence)("== Physical Plan ==\\s*LocalTableScan \\[max\\(" + column + "\\).*]\\s*"));
            ImmutableList maxMin = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{3, "ala"}));
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT max(root.entry_one), min(root.entry_two) FROM default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)maxMin);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT max(root.entry_one), min(root.entry_two) FROM delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)maxMin);
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testTrinoCheckpointNullStatisticsForRowType() {
        String tableName = "test_dl_checkpoints_row_compat_trino_" + TestingNames.randomNameSuffix();
        this.testCheckpointNullStatisticsForRowType(sql -> QueryExecutors.onTrino().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "delta.default." + tableName);
    }

    @Test(groups={"delta-lake-databricks", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testDatabricksCheckpointNullStatisticsForRowType() {
        String tableName = "test_dl_checkpoints_row_compat_databricks_" + TestingNames.randomNameSuffix();
        this.testCheckpointNullStatisticsForRowType(sql -> QueryExecutors.onDelta().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "default." + tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCheckpointNullStatisticsForRowType(Consumer<String> sqlExecutor, String tableName, String qualifiedTableName) {
        ImmutableList expectedRows = ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{1, "ala"}), (Object)QueryAssert.Row.row((Object[])new Object[]{2, "kota"}), (Object)QueryAssert.Row.row((Object[])new Object[]{null, null}), (Object)QueryAssert.Row.row((Object[])new Object[]{4, "zulu"}));
        QueryExecutors.onDelta().executeQuery(String.format("CREATE TABLE default.%s      (id INT, root STRUCT<entry_one : INT, entry_two : STRING>)      USING DELTA       LOCATION 's3://%s/databricks-compatibility-test-%1$s'       TBLPROPERTIES (delta.checkpointInterval = 1)", tableName, this.bucketName), new QueryExecutor.QueryParam[0]);
        try {
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, STRUCT(1,'ala')), (2, STRUCT(2, 'kota'))", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3, STRUCT(null, null))", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (4, STRUCT(4, 'zulu'))", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat(this.listCheckpointFiles(this.bucketName, "databricks-compatibility-test-" + tableName)).hasSize(3);
            TransactionLogAssertions.assertTransactionLogVersion(this.s3, this.bucketName, tableName, 3);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT DISTINCT root.entry_one, root.entry_two FROM default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT DISTINCT root.entry_one, root.entry_two FROM delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)expectedRows);
            sqlExecutor.accept("DELETE FROM " + qualifiedTableName + " WHERE id = 4");
            TransactionLogAssertions.assertLastEntryIsCheckpointed(this.s3, this.bucketName, tableName);
            String explainCountNotNull = (String)Iterables.getOnlyElement((Iterable)QueryExecutors.onDelta().executeQuery("EXPLAIN SELECT count(root.entry_two) FROM default." + tableName, new QueryExecutor.QueryParam[0]).column(1));
            String column = this.databricksRuntimeVersion.isAtLeast(DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION) ? "root.entry_two" : "root.entry_two AS `entry_two`";
            Assertions.assertThat((String)explainCountNotNull).matches((CharSequence)("== Physical Plan ==\\s*LocalTableScan \\[count\\(" + column + "\\).*]\\s*"));
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT count(root.entry_two) FROM default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{2})});
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT count(root.entry_two) FROM delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{2})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testTrinoWriteStatsAsJsonDisabled() {
        String tableName = "test_dl_checkpoints_write_stats_as_json_disabled_trino_" + TestingNames.randomNameSuffix();
        this.testWriteStatsAsJsonDisabled(sql -> QueryExecutors.onTrino().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "delta.default." + tableName, 3.0, 1.0);
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testDatabricksWriteStatsAsJsonDisabled() {
        String tableName = "test_dl_checkpoints_write_stats_as_json_disabled_databricks_" + TestingNames.randomNameSuffix();
        this.testWriteStatsAsJsonDisabled(sql -> QueryExecutors.onDelta().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "default." + tableName, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testWriteStatsAsJsonDisabled(Consumer<String> sqlExecutor, String tableName, String qualifiedTableName, Double dataSize, Double distinctValues) {
        QueryExecutors.onDelta().executeQuery(String.format("CREATE TABLE default.%s(a_number INT, a_string STRING) USING DELTA PARTITIONED BY (a_number) LOCATION 's3://%s/databricks-compatibility-test-%1$s' TBLPROPERTIES ( delta.checkpointInterval = 5,  delta.checkpoint.writeStatsAsJson = false)", tableName, this.bucketName), new QueryExecutor.QueryParam[0]);
        try {
            sqlExecutor.accept("INSERT INTO " + qualifiedTableName + " VALUES (1,'ala')");
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{"a_number", null, 1.0, 0.0, null, null, null}), (Object)QueryAssert.Row.row((Object[])new Object[]{"a_string", dataSize, distinctValues, 0.0, null, null, null}), (Object)QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, 1.0, null, null})));
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testTrinoWriteStatsAsStructDisabled() {
        String tableName = "test_dl_checkpoints_write_stats_as_struct_disabled_trino_" + TestingNames.randomNameSuffix();
        this.testWriteStatsAsStructDisabled(sql -> QueryExecutors.onTrino().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "delta.default." + tableName);
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testDatabricksWriteStatsAsStructDisabled() {
        String tableName = "test_dl_checkpoints_write_stats_as_struct_disabled_databricks_" + TestingNames.randomNameSuffix();
        this.testWriteStatsAsStructDisabled(sql -> QueryExecutors.onDelta().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "default." + tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testWriteStatsAsStructDisabled(Consumer<String> sqlExecutor, String tableName, String qualifiedTableName) {
        QueryExecutors.onDelta().executeQuery(String.format("CREATE TABLE default.%s(a_number INT, a_string STRING) USING DELTA PARTITIONED BY (a_number) LOCATION 's3://%s/databricks-compatibility-test-%1$s' TBLPROPERTIES ( delta.checkpointInterval = 1,  delta.checkpoint.writeStatsAsJson = false,  delta.checkpoint.writeStatsAsStruct = false)", tableName, this.bucketName), new QueryExecutor.QueryParam[0]);
        try {
            sqlExecutor.accept("INSERT INTO " + qualifiedTableName + " VALUES (1,'ala')");
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{"a_number", null, null, null, null, null, null}), (Object)QueryAssert.Row.row((Object[])new Object[]{"a_string", null, null, null, null, null, null}), (Object)QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, null, null, null})));
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"}, dataProvider="testTrinoCheckpointWriteStatsAsJson")
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testTrinoWriteStatsAsJsonEnabled(String type, String inputValue, Double dataSize, Double distinctValues, Double nullsFraction, Object statsValue) {
        String tableName = "test_dl_checkpoints_write_stats_as_json_enabled_trino_" + TestingNames.randomNameSuffix();
        this.testWriteStatsAsJsonEnabled(sql -> QueryExecutors.onTrino().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "delta.default." + tableName, type, inputValue, dataSize, distinctValues, nullsFraction, statsValue);
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"}, dataProvider="testDeltaCheckpointWriteStatsAsJson")
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testDatabricksWriteStatsAsJsonEnabled(String type, String inputValue, Double nullsFraction, Object statsValue) {
        String tableName = "test_dl_checkpoints_write_stats_as_json_enabled_databricks_" + TestingNames.randomNameSuffix();
        this.testWriteStatsAsJsonEnabled(sql -> QueryExecutors.onDelta().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "default." + tableName, type, inputValue, null, null, nullsFraction, statsValue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testWriteStatsAsJsonEnabled(Consumer<String> sqlExecutor, String tableName, String qualifiedTableName, String type, String inputValue, Double dataSize, Double distinctValues, Double nullsFraction, Object statsValue) {
        String createTableSql = String.format("CREATE TABLE default.%s(col %s) USING DELTA LOCATION 's3://%s/databricks-compatibility-test-%1$s' TBLPROPERTIES ( delta.checkpointInterval = 2,  delta.checkpoint.writeStatsAsJson = false,  delta.checkpoint.writeStatsAsStruct = true)", tableName, type, this.bucketName);
        if (this.databricksRuntimeVersion.equals(DatabricksVersion.DATABRICKS_91_RUNTIME_VERSION) && type.equals("struct<x bigint>")) {
            Assertions.assertThatThrownBy(() -> QueryExecutors.onDelta().executeQuery(createTableSql, new QueryExecutor.QueryParam[0])).hasStackTraceContaining("ParseException");
            throw new SkipException("New runtime version covers the type");
        }
        QueryExecutors.onDelta().executeQuery(createTableSql, new QueryExecutor.QueryParam[0]);
        try {
            sqlExecutor.accept("INSERT INTO " + qualifiedTableName + " SELECT " + inputValue);
            sqlExecutor.accept("INSERT INTO " + qualifiedTableName + " SELECT " + inputValue);
            QueryExecutors.onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES ('delta.checkpoint.writeStatsAsJson' = true, 'delta.checkpoint.writeStatsAsStruct' = false)", new QueryExecutor.QueryParam[0]);
            sqlExecutor.accept("INSERT INTO " + qualifiedTableName + " SELECT " + inputValue);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{"col", dataSize, distinctValues, nullsFraction, null, statsValue, statsValue}), (Object)QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, 3.0, null, null})));
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @DataProvider
    public Object[][] testTrinoCheckpointWriteStatsAsJson() {
        return new Object[][]{{"boolean", "true", null, 1.0, 0.0, null}, {"integer", "1", null, 1.0, 0.0, "1"}, {"tinyint", "2", null, 1.0, 0.0, "2"}, {"smallint", "3", null, 1.0, 0.0, "3"}, {"bigint", "1000", null, 1.0, 0.0, "1000"}, {"real", "0.1", null, 1.0, 0.0, "0.1"}, {"double", "1.0", null, 1.0, 0.0, "1.0"}, {"decimal(3,2)", "3.14", null, 1.0, 0.0, "3.14"}, {"decimal(30,1)", "12345", null, 1.0, 0.0, "12345.0"}, {"string", "'test'", 12.0, 1.0, 0.0, null}, {"binary", "X'65683F'", 9.0, 1.0, 0.0, null}, {"date", "date '2021-02-03'", null, 1.0, 0.0, "2021-02-03"}, {"timestamp", "timestamp '2001-08-22 11:04:05.321 UTC'", null, 1.0, 0.0, "2001-08-22 11:04:05.321 UTC"}, {"array<int>", "array[1]", null, null, null, null}, {"map<string,int>", "map(array['key1', 'key2'], array[1, 2])", null, null, null, null}, {"struct<x bigint>", "cast(row(1) as row(x bigint))", null, null, null, null}};
    }

    @DataProvider
    public Object[][] testDeltaCheckpointWriteStatsAsJson() {
        return new Object[][]{{"boolean", "true", 0.0, null}, {"integer", "1", 0.0, "1"}, {"tinyint", "2", 0.0, "2"}, {"smallint", "3", 0.0, "3"}, {"bigint", "1000", 0.0, "1000"}, {"real", "0.1", 0.0, "0.1"}, {"double", "1.0", 0.0, "1.0"}, {"decimal(3,2)", "3.14", 0.0, "3.14"}, {"decimal(30,1)", "12345", 0.0, "12345.0"}, {"string", "'test'", 0.0, null}, {"binary", "X'65683F'", 0.0, null}, {"date", "date '2021-02-03'", 0.0, "2021-02-03"}, {"timestamp", "timestamp '2001-08-22 11:04:05.321 UTC'", 0.0, "2001-08-22 11:04:05.321 UTC"}, {"array<int>", "array(1)", 0.0, null}, {"map<string,int>", "map('key1', 1, 'key2', 2)", 0.0, null}, {"struct<x bigint>", "named_struct('x', 1)", null, null}};
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testTrinoWriteStatsAsStructEnabled() {
        String tableName = "test_dl_checkpoints_write_stats_as_struct_enabled_trino_" + TestingNames.randomNameSuffix();
        this.testWriteStatsAsStructEnabled(sql -> QueryExecutors.onTrino().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "delta.default." + tableName, 3.0, 1.0);
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    @Flaky(issue="https://github.com/trinodb/trino/issues/14391", match="\\Q[Databricks][DatabricksJDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503, Error message: Unknown.")
    public void testDatabricksWriteStatsAsStructEnabled() {
        String tableName = "test_dl_checkpoints_write_stats_as_struct_enabled_databricks_" + TestingNames.randomNameSuffix();
        this.testWriteStatsAsStructEnabled(sql -> QueryExecutors.onDelta().executeQuery(sql, new QueryExecutor.QueryParam[0]), tableName, "default." + tableName, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testWriteStatsAsStructEnabled(Consumer<String> sqlExecutor, String tableName, String qualifiedTableName, Double dataSize, Double distinctValues) {
        QueryExecutors.onDelta().executeQuery(String.format("CREATE TABLE default.%s(a_number INT, a_string STRING) USING DELTA PARTITIONED BY (a_number) LOCATION 's3://%s/databricks-compatibility-test-%1$s' TBLPROPERTIES ( delta.checkpointInterval = 1,  delta.checkpoint.writeStatsAsJson = false,  delta.checkpoint.writeStatsAsStruct = true)", tableName, this.bucketName), new QueryExecutor.QueryParam[0]);
        try {
            sqlExecutor.accept("INSERT INTO " + qualifiedTableName + " VALUES (1,'ala')");
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly((List)ImmutableList.of((Object)QueryAssert.Row.row((Object[])new Object[]{"a_number", null, 1.0, 0.0, null, null, null}), (Object)QueryAssert.Row.row((Object[])new Object[]{"a_string", dataSize, distinctValues, 0.0, null, null, null}), (Object)QueryAssert.Row.row((Object[])new Object[]{null, null, null, null, 1.0, null, null})));
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    private void fillWithInserts(String tableName, String values, int toCreate) {
        for (int i = 0; i < toCreate; ++i) {
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s VALUES %s", tableName, values), new QueryExecutor.QueryParam[0])).updatedRowsCountIsEqualTo(1);
        }
    }

    private List<String> listCheckpointFiles(String bucketName, String tableDirectory) {
        List<String> allFiles = this.listS3Directory(bucketName, tableDirectory + "/_delta_log");
        return (List)allFiles.stream().filter(path -> path.contains("checkpoint.parquet")).collect(ImmutableList.toImmutableList());
    }

    private List<String> listS3Directory(String bucketName, String directory) {
        ImmutableList.Builder result = ImmutableList.builder();
        ObjectListing listing = this.s3.listObjects(bucketName, directory);
        do {
            listing.getObjectSummaries().stream().map(S3ObjectSummary::getKey).forEach(arg_0 -> ((ImmutableList.Builder)result).add(arg_0));
        } while ((listing = this.s3.listNextBatchOfObjects(listing)).isTruncated());
        return result.build();
    }
}

