/*
 * Decompiled with CFR 0.152.
 */
package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.BeforeTestWithContext;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.query.QueryExecutor;
import io.trino.tempto.query.QueryResult;
import io.trino.testing.TestingNames;
import io.trino.tests.product.deltalake.BaseTestDeltaLakeS3Storage;
import io.trino.tests.product.deltalake.S3ClientFactory;
import io.trino.tests.product.deltalake.util.DeltaLakeTestUtils;
import io.trino.tests.product.utils.QueryExecutors;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

public class TestDeltaLakeDatabricksChangeDataFeedCompatibility
extends BaseTestDeltaLakeS3Storage {
    @Inject
    @Named(value="s3.server_type")
    private String s3ServerType;
    private AmazonS3 s3Client;

    @BeforeTestWithContext
    public void setup() {
        super.setUp();
        this.s3Client = new S3ClientFactory().createS3Client(this.s3ServerType);
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testUpdateTableWithCdf() {
        String tableName = "test_updates_to_table_with_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (col1 VARCHAR, updated_column INT) WITH (location = 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName + "', change_data_feed_enabled = true)", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat((String)QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + tableName, new QueryExecutor.QueryParam[0]).getOnlyValue().toString()).contains(new CharSequence[]{"change_data_feed_enabled = true"});
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue2', 2)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 5 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 4, col1 = 'testValue4' WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1, "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 2, "insert", 2L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "insert", 3L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "update_preimage", 4L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 5, "update_postimage", 4L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 5, "update_preimage", 5L}), QueryAssert.Row.row((Object[])new Object[]{"testValue4", 4, "update_postimage", 5L})});
        }
        finally {
            QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + tableName, new QueryExecutor.QueryParam[0]);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testUpdatePartitionedTableWithCdf() {
        String tableName = "test_updates_to_partitioned_table_with_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) USING DELTA PARTITIONED BY (partitioning_column_1, partitioning_column_2) LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1, 'partition1')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue2', 2, 'partition2')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue3', 3, 'partition3')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 'testValue5' WHERE partitioning_column_1 = 3", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT updated_column, partitioning_column_1, partitioning_column_2, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1, "partition1", "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 2, "partition2", "insert", 2L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "partition3", "insert", 3L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "partition3", "update_preimage", 4L}), QueryAssert.Row.row((Object[])new Object[]{"testValue5", 3, "partition3", "update_postimage", 4L})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled() {
        String tableName = "test_updates_to_table_with_many_rows_inserted_in_one_query_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName + " (col1 STRING, updated_column INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1), ('testValue2', 2), ('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 5 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1, "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 2, "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "update_preimage", 2L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 5, "update_postimage", 2L})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdfEnabled() {
        String tableName = "test_updates_to_partitioned_table_with_many_rows_inserted_in_one_query_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) USING DELTA PARTITIONED BY (partitioning_column_1, partitioning_column_2) LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1, 'partition1'), ('testValue2', 2, 'partition2'), ('testValue3', 3, 'partition3')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 'testValue5' WHERE partitioning_column_1 = 3", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT updated_column, partitioning_column_1, partitioning_column_2, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1, "partition1", "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 2, "partition2", "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "partition3", "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "partition3", "update_preimage", 2L}), QueryAssert.Row.row((Object[])new Object[]{"testValue5", 3, "partition3", "update_postimage", 2L})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated() {
        String tableName = "test_updates_partitioning_column_in_table_with_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) USING DELTA PARTITIONED BY (partitioning_column_1, partitioning_column_2) LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1, 'partition1'), ('testValue2', 2, 'partition2'), ('testValue3', 3, 'partition3')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET partitioning_column_1 = 5 WHERE partitioning_column_2 = 'partition1'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET partitioning_column_1 = 4, updated_column = 'testValue4' WHERE partitioning_column_2 = 'partition2'", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT updated_column, partitioning_column_1, partitioning_column_2, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1, "partition1", "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 2, "partition2", "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "partition3", "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1, "partition1", "update_preimage", 2L}), QueryAssert.Row.row((Object[])new Object[]{"testValue1", 5, "partition1", "update_postimage", 2L}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 2, "partition2", "update_preimage", 3L}), QueryAssert.Row.row((Object[])new Object[]{"testValue4", 4, "partition2", "update_postimage", 3L})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated() {
        String tableName = "test_updates_to_table_with_cdf_enabled_later_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName + " (col1 STRING, updated_column INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName + "'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue2', 2)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 5 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            long versionWithCdfEnabled = (Long)QueryExecutors.onDelta().executeQuery("DESCRIBE HISTORY default." + tableName + " LIMIT 1", new QueryExecutor.QueryParam[0]).row(0).get(0);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 4 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertQueryFailure(() -> QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)", new QueryExecutor.QueryParam[0])).hasMessageMatching("(?s)(.*Error getting change data for range \\[0 , 6] as change data was not\nrecorded for version \\[0].*)");
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES ('testValue6', 6)", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery(String.format("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default.%s', %d)", tableName, versionWithCdfEnabled), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue3", 5, "update_preimage", 6L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 4, "update_postimage", 6L}), QueryAssert.Row.row((Object[])new Object[]{"testValue6", 6, "insert", 7L})});
            long lastVersionWithCdf = (Long)QueryExecutors.onDelta().executeQuery("DESCRIBE HISTORY default." + tableName + " LIMIT 1", new QueryExecutor.QueryParam[0]).row(0).get(0);
            QueryExecutors.onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES (delta.enableChangeDataFeed = false)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES ('testValue7', 7)", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version " + String.format("FROM table_changes('default.%s', %d, %d)", tableName, versionWithCdfEnabled, lastVersionWithCdf), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue3", 5, "update_preimage", 6L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 4, "update_postimage", 6L}), QueryAssert.Row.row((Object[])new Object[]{"testValue6", 6, "insert", 7L})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testDeleteFromTableWithCdf() {
        String tableName = "test_deletes_from_table_with_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName + " (col1 STRING, updated_column INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES('testValue1', 1)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES('testValue2', 2)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1, "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 2, "insert", 2L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "insert", 3L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, "delete", 4L})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testMergeUpdateIntoTableWithCdfEnabled() {
        String tableName1 = "test_merge_update_into_table_with_cdf_" + TestingNames.randomNameSuffix();
        String tableName2 = "test_merge_update_into_table_with_cdf_data_table_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName1 + " (nationkey INT, name STRING, regionkey INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName1 + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName2 + " (nationkey INT, name STRING, regionkey INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName2 + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName1 + " VALUES (1, 'nation1', 100)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName1 + " VALUES (2, 'nation2', 200)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName1 + " VALUES (3, 'nation3', 300)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName2 + " VALUES (1000, 'nation1000', 1000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName2 + " VALUES (2, 'nation2', 20000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName2 + " VALUES (3000, 'nation3000', 3000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("MERGE INTO delta.default." + tableName1 + " cdf USING delta.default." + tableName2 + " n ON (cdf.nationkey = n.nationkey) WHEN MATCHED THEN UPDATE SET nationkey = (cdf.nationkey + n.nationkey + n.regionkey) WHEN NOT MATCHED THEN INSERT (nationkey, name, regionkey) VALUES (n.nationkey, n.name, n.regionkey)", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT * FROM " + tableName1, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1000, "nation1000", 1000}), QueryAssert.Row.row((Object[])new Object[]{3000, "nation3000", 3000}), QueryAssert.Row.row((Object[])new Object[]{1, "nation1", 100}), QueryAssert.Row.row((Object[])new Object[]{3, "nation3", 300}), QueryAssert.Row.row((Object[])new Object[]{20004, "nation2", 200})});
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT nationkey, name, regionkey, _change_type, _commit_version FROM table_changes('default." + tableName1 + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, "nation1", 100, "insert", 1}), QueryAssert.Row.row((Object[])new Object[]{2, "nation2", 200, "insert", 2}), QueryAssert.Row.row((Object[])new Object[]{3, "nation3", 300, "insert", 3}), QueryAssert.Row.row((Object[])new Object[]{1000, "nation1000", 1000, "insert", 4}), QueryAssert.Row.row((Object[])new Object[]{3000, "nation3000", 3000, "insert", 4}), QueryAssert.Row.row((Object[])new Object[]{2, "nation2", 200, "update_preimage", 4}), QueryAssert.Row.row((Object[])new Object[]{20004, "nation2", 200, "update_postimage", 4})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName1);
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName2);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testMergeDeleteIntoTableWithCdfEnabled() {
        String tableName1 = "test_merge_delete_into_table_with_cdf_" + TestingNames.randomNameSuffix();
        String tableName2 = "test_merge_delete_into_table_with_cdf_data_table_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName1 + " (nationkey INT, name STRING, regionkey INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName1 + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName2 + " (nationkey INT, name STRING, regionkey INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName2 + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName1 + " VALUES (1, 'nation1', 100)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName1 + " VALUES (2, 'nation2', 200)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName1 + " VALUES (3, 'nation3', 300)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName2 + " VALUES (1000, 'nation1000', 1000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName2 + " VALUES (2, 'nation2', 20000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName2 + " VALUES (3000, 'nation3000', 3000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("MERGE INTO delta.default." + tableName1 + " cdf USING delta.default." + tableName2 + " n ON (cdf.nationkey = n.nationkey) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (nationkey, name, regionkey) VALUES (n.nationkey, n.name, n.regionkey)", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT * FROM " + tableName1, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1000, "nation1000", 1000}), QueryAssert.Row.row((Object[])new Object[]{3000, "nation3000", 3000}), QueryAssert.Row.row((Object[])new Object[]{1, "nation1", 100}), QueryAssert.Row.row((Object[])new Object[]{3, "nation3", 300})});
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT nationkey, name, regionkey, _change_type, _commit_version FROM table_changes('default." + tableName1 + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, "nation1", 100, "insert", 1}), QueryAssert.Row.row((Object[])new Object[]{2, "nation2", 200, "insert", 2}), QueryAssert.Row.row((Object[])new Object[]{3, "nation3", 300, "insert", 3}), QueryAssert.Row.row((Object[])new Object[]{1000, "nation1000", 1000, "insert", 4}), QueryAssert.Row.row((Object[])new Object[]{3000, "nation3000", 3000, "insert", 4}), QueryAssert.Row.row((Object[])new Object[]{2, "nation2", 200, "delete", 4})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName1);
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName2);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled() {
        String targetTableName = "test_merge_mixed_delete_and_update_into_table_with_cdf_" + TestingNames.randomNameSuffix();
        String sourceTableName = "test_merge_mixed_delete_and_update_into_table_with_cdf_data_table_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + targetTableName + " (page_id INT, page_url STRING, views INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + targetTableName + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + sourceTableName + " (page_id INT, page_url STRING, views INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + sourceTableName + "'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + targetTableName + " VALUES (1, 'pageUrl1', 100)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + targetTableName + " VALUES (2, 'pageUrl2', 200)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + targetTableName + " VALUES (3, 'pageUrl3', 300)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + targetTableName + " VALUES (4, 'pageUrl4', 400)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + sourceTableName + " VALUES (1000, 'pageUrl1000', 1000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + sourceTableName + " VALUES (2, 'pageUrl2', 20000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + sourceTableName + " VALUES (3000, 'pageUrl3000', 3000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + sourceTableName + " VALUES (4, 'pageUrl4000', 4000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("MERGE INTO delta.default." + targetTableName + " targetTable USING delta.default." + sourceTableName + " sourceTable ON (targetTable.page_id = sourceTable.page_id) WHEN MATCHED AND targetTable.page_id = 2 THEN DELETE WHEN MATCHED AND targetTable.page_id > 2 THEN UPDATE SET views = (targetTable.views + sourceTable.views) WHEN NOT MATCHED THEN INSERT (page_id, page_url, views) VALUES (sourceTable.page_id, sourceTable.page_url, sourceTable.views)", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT * FROM " + targetTableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1000, "pageUrl1000", 1000}), QueryAssert.Row.row((Object[])new Object[]{3000, "pageUrl3000", 3000}), QueryAssert.Row.row((Object[])new Object[]{4, "pageUrl4", 4400}), QueryAssert.Row.row((Object[])new Object[]{1, "pageUrl1", 100}), QueryAssert.Row.row((Object[])new Object[]{3, "pageUrl3", 300})});
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT page_id, page_url, views, _change_type, _commit_version FROM table_changes('default." + targetTableName + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{1, "pageUrl1", 100, "insert", 1}), QueryAssert.Row.row((Object[])new Object[]{2, "pageUrl2", 200, "insert", 2}), QueryAssert.Row.row((Object[])new Object[]{3, "pageUrl3", 300, "insert", 3}), QueryAssert.Row.row((Object[])new Object[]{4, "pageUrl4", 400, "insert", 4}), QueryAssert.Row.row((Object[])new Object[]{1000, "pageUrl1000", 1000, "insert", 5}), QueryAssert.Row.row((Object[])new Object[]{3000, "pageUrl3000", 3000, "insert", 5}), QueryAssert.Row.row((Object[])new Object[]{2, "pageUrl2", 200, "delete", 5}), QueryAssert.Row.row((Object[])new Object[]{4, "pageUrl4", 4400, "update_postimage", 5}), QueryAssert.Row.row((Object[])new Object[]{4, "pageUrl4", 400, "update_preimage", 5})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + targetTableName);
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + sourceTableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testDeleteFromNullPartitionWithCdfEnabled() {
        String tableName = "test_delete_from_null_partition_with_cdf_enabled" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + tableName + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) USING DELTA PARTITIONED BY (partitioning_column_1, partitioning_column_2) LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1, 'partition1'), ('testValue2', 2, 'partition2'), ('testValue3', 3, NULL)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE partitioning_column_2 IS NULL", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onTrino().executeQuery("SELECT * FROM delta.default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1, "partition1"}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 2, "partition2"})});
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT updated_column, partitioning_column_1, partitioning_column_2, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)", new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1, "partition1", "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 2, "partition2", "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, null, "insert", 1L}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 3, null, "delete", 2L})});
        }
        finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + tableName);
        }
    }

    @Test(groups={"delta-lake-databricks", "delta-lake-exclude-73", "profile_specific_tests"})
    public void testThatCdfDoesntWorkWhenPropertyIsNotSet() {
        String tableName1 = "test_cdf_doesnt_work_when_property_is_not_set_1_" + TestingNames.randomNameSuffix();
        String tableName2 = "test_cdf_doesnt_work_when_property_is_not_set_2_" + TestingNames.randomNameSuffix();
        this.assertThereIsNoCdfFileGenerated(tableName1, "");
        this.assertThereIsNoCdfFileGenerated(tableName2, "change_data_feed_enabled = false");
    }

    private void assertThereIsNoCdfFileGenerated(String tableName, String tableProperty) {
        try {
            QueryExecutors.onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (col1 VARCHAR, updated_column INT) WITH (location = 's3://" + this.bucketName + "/databricks-compatibility-test-" + tableName + "'" + (String)(tableProperty.isEmpty() ? "" : ", " + tableProperty) + ")", new QueryExecutor.QueryParam[0]);
            if (tableProperty.isEmpty()) {
                Assertions.assertThat((String)QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + tableName, new QueryExecutor.QueryParam[0]).getOnlyValue().toString()).doesNotContain(new CharSequence[]{"change_data_feed_enabled"});
            } else {
                Assertions.assertThat((String)QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + tableName, new QueryExecutor.QueryParam[0]).getOnlyValue().toString()).contains(new CharSequence[]{tableProperty});
            }
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue2', 2)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 5 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("UPDATE default." + tableName + " SET updated_column = 4 WHERE col1 = 'testValue2'", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertThat((QueryResult)QueryExecutors.onDelta().executeQuery("SELECT * FROM default." + tableName, new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{"testValue1", 1}), QueryAssert.Row.row((Object[])new Object[]{"testValue2", 4}), QueryAssert.Row.row((Object[])new Object[]{"testValue3", 5})});
            this.assertThatThereIsNoChangeDataFiles(tableName);
        }
        finally {
            QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + tableName, new QueryExecutor.QueryParam[0]);
        }
    }

    private void assertThatThereIsNoChangeDataFiles(String tableName) {
        String prefix = "databricks-compatibility-test-" + tableName + "/_change_data/";
        ListObjectsV2Result listResult = this.s3Client.listObjectsV2(this.bucketName, prefix);
        Assertions.assertThat((List)listResult.getObjectSummaries()).isEmpty();
    }
}

