/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql;

import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.flink.core.execution.JobClient;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.mysql.MySqlActionITCaseBase;
import org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class MySqlSyncTableActionITCase
extends MySqlActionITCaseBase {
    private static final String DATABASE_NAME = "paimon_sync_table";

    @BeforeAll
    public static void startContainers() {
        MYSQL_CONTAINER.withSetupSQL("mysql/sync_table_setup.sql");
        MySqlSyncTableActionITCase.start();
    }

    @Test
    @Timeout(value=60L)
    public void testSchemaEvolution() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", DATABASE_NAME);
        mySqlConfig.put("table-name", "schema_evolution_\\d+");
        MySqlSyncTableAction action = (MySqlSyncTableAction)this.syncTableActionBuilder(mySqlConfig).withCatalogConfig(Collections.singletonMap(CatalogOptions.METASTORE.key(), "test-alter-table")).withTableConfig(this.getBasicTableConfig()).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.checkTableSchema("[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
        try (Statement statement = this.getStatement();){
            this.testSchemaEvolutionImpl(statement);
        }
    }

    private void checkTableSchema(String excepted) throws Exception {
        FileStoreTable table = this.getFileStoreTable();
        Assertions.assertThat((String)JsonSerdeUtil.toFlatJson((Object)table.schema().fields())).isEqualTo(excepted);
    }

    private void testSchemaEvolutionImpl(Statement statement) throws Exception {
        FileStoreTable table = this.getFileStoreTable();
        statement.executeUpdate("USE paimon_sync_table");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1, 'one')");
        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 2, 'two'), (2, 4, 'four')");
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"pt", "_id", "v1"});
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2 INT");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (2, 3, 'three', 30), (1, 5, 'five', 50)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v2 INT");
        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 6, 'six', 60)");
        statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'second' WHERE _id = 2");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"pt", "_id", "v1", "v2"});
        expected = Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30]", "+I[2, 4, four, NULL]", "+I[1, 5, five, 50]", "+I[1, 6, six, 60]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v2 BIGINT");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven', 70000000000)");
        statement.executeUpdate("DELETE FROM schema_evolution_1 WHERE _id = 5");
        statement.executeUpdate("UPDATE schema_evolution_1 SET v2 = 30000000000 WHERE _id = 3");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v2 BIGINT");
        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (2, 8, 'eight', 80000000000)");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.BIGINT()}, (String[])new String[]{"pt", "_id", "v1", "v2"});
        expected = Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30000000000]", "+I[2, 4, four, NULL]", "+I[1, 6, six, 60]", "+I[2, 7, seven, 70000000000]", "+I[2, 8, eight, 80000000000]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v3 NUMERIC(8, 3)");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v4 VARBINARY(10)");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v5 FLOAT");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v1 VARCHAR(20)");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 9, 'nine', 90000000000, 99999.999, 'nine.bin', 9.9)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v3 NUMERIC(8, 3)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4 VARBINARY(10)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5 FLOAT");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v1 VARCHAR(20)");
        statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE _id = 8");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.VARBINARY((int)10), DataTypes.FLOAT()}, (String[])new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
        expected = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, NULL, NULL]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v4 VARBINARY(20)");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v5 DOUBLE");
        statement.executeUpdate("UPDATE schema_evolution_1 SET v4 = 'nine.bin.long', v5 = 9.00000000009 WHERE _id = 9");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v4 VARBINARY(20)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v5 DOUBLE");
        statement.executeUpdate("UPDATE schema_evolution_2 SET v4 = 'four.bin.long', v5 = 4.00000000004 WHERE _id = 4");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.VARBINARY((int)20), DataTypes.DOUBLE()}, (String[])new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
        expected = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        Assertions.assertThat((Map)this.getFileStoreTable().options()).containsEntry((Object)"alter-table-test", (Object)"true");
    }

    @Test
    @Timeout(value=60L)
    public void testMultipleSchemaEvolutions() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", DATABASE_NAME);
        mySqlConfig.put("table-name", "schema_evolution_multiple");
        MySqlSyncTableAction action = this.syncTableActionBuilder(mySqlConfig).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.checkTableSchema("[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
        try (Statement statement = this.getStatement();){
            this.testSchemaEvolutionMultipleImpl(statement);
        }
    }

    private void testSchemaEvolutionMultipleImpl(Statement statement) throws Exception {
        FileStoreTable table = this.getFileStoreTable();
        statement.executeUpdate("USE paimon_sync_table");
        statement.executeUpdate("INSERT INTO schema_evolution_multiple VALUES (1, 'one', 10, 'string_1')");
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"_id", "v1", "v2", "v3"});
        List<String> primaryKeys = Collections.singletonList("_id");
        List<String> expected = Collections.singletonList("+I[1, one, 10, string_1]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_multiple ADD v4 INT, MODIFY COLUMN v1 VARCHAR(20), ADD COLUMN (v5 DOUBLE, v6 DECIMAL(5, 3), `$% ^,& *(` VARCHAR(10) COMMENT 'Hi, v700 DOUBLE \\', v701 INT a test'), MODIFY v2 BIGINT");
        statement.executeUpdate("INSERT INTO schema_evolution_multiple VALUES (2, 'long_string_two', 2000000000000, 'string_2', 20, 20.5, 20.002, 'test_2')");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT(), DataTypes.VARCHAR((int)10), DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.DECIMAL((int)5, (int)3), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"_id", "v1", "v2", "v3", "v4", "v5", "v6", "$% ^,& *("});
        expected = Arrays.asList("+I[1, one, 10, string_1, NULL, NULL, NULL, NULL]", "+I[2, long_string_two, 2000000000000, string_2, 20, 20.5, 20.002, test_2]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=90L)
    public void testAllTypes() throws Exception {
        for (int i = 0; i < 2; ++i) {
            this.testAllTypesOnce();
        }
    }

    private void testAllTypesOnce() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", DATABASE_NAME);
        mySqlConfig.put("table-name", "all_types_table");
        MySqlSyncTableAction action = (MySqlSyncTableAction)this.syncTableActionBuilder(mySqlConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").build();
        JobClient client = this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            this.testAllTypesImpl(statement);
        }
        client.cancel().get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAllTypesImpl(Statement statement) throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DECIMAL((int)2, (int)1).notNull(), DataTypes.BOOLEAN(), DataTypes.BINARY((int)8), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DECIMAL((int)20, (int)0).notNull(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.DECIMAL((int)8, (int)3), DataTypes.DECIMAL((int)8, (int)3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)38, (int)10), DataTypes.DATE(), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)2), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP((int)0), DataTypes.CHAR((int)10), DataTypes.VARCHAR((int)20), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.VARBINARY((int)10), DataTypes.VARBINARY((int)20), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT(), DataTypes.TIME(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.ARRAY((DataType)DataTypes.STRING())}, (String[])new String[]{"_id", "pt", "_bit1", "_bit", "_tinyint1", "_boolean", "_bool", "_tinyint", "_tinyint_unsigned", "_tinyint_unsigned_zerofill", "_smallint", "_smallint_unsigned", "_smallint_unsigned_zerofill", "_mediumint", "_mediumint_unsigned", "_mediumint_unsigned_zerofill", "_int", "_int_unsigned", "_int_unsigned_zerofill", "_bigint", "_bigint_unsigned", "_bigint_unsigned_zerofill", "_serial", "_float", "_float_unsigned", "_float_unsigned_zerofill", "_real", "_real_unsigned", "_real_unsigned_zerofill", "_double", "_double_unsigned", "_double_unsigned_zerofill", "_double_precision", "_double_precision_unsigned", "_double_precision_unsigned_zerofill", "_numeric", "_numeric_unsigned", "_numeric_unsigned_zerofill", "_fixed", "_fixed_unsigned", "_fixed_unsigned_zerofill", "_decimal", "_decimal_unsigned", "_decimal_unsigned_zerofill", "_big_decimal", "_date", "_datetime", "_datetime3", "_datetime6", "_datetime_p", "_datetime_p2", "_timestamp", "_timestamp0", "_char", "_varchar", "_tinytext", "_text", "_mediumtext", "_longtext", "_bin", "_varbin", "_tinyblob", "_blob", "_mediumblob", "_longblob", "_json", "_enum", "_year", "_time", "_point", "_geometry", "_linestring", "_polygon", "_multipoint", "_multiline", "_multipolygon", "_geometrycollection", "_set"});
        FileStoreTable table = this.getFileStoreTable();
        String bits = Arrays.toString(new byte[]{0, 0, 0, 0, 0, 0, 7, -57});
        List<String> expected = Arrays.asList("+I[1, 1.1, " + String.format("true, %s, ", bits) + "true, true, false, 1, 2, 3, 1000, 2000, 3000, 100000, 200000, 300000, 1000000, 2000000, 3000000, 10000000000, 20000000000, 30000000000, 40000000000, 1.5, 2.5, 3.5, 1.000001, 2.000002, 3.000003, 1.000011, 2.000022, 3.000033, 1.000111, 2.000222, 3.000333, 12345.110, 12345.220, 12345.330, 123456789876543212345678987654321.11, 123456789876543212345678987654321.22, 123456789876543212345678987654321.33, 11111, 22222, 33333, 2222222222222222300000001111.1234567890, 19439, 2023-03-23T14:30:05, 2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, 2023-03-24T14:30, 2023-03-24T14:30:05.120, 2023-03-23T18:00:10.123456, 2023-03-23T03:10, Paimon, Apache Paimon, Apache Paimon MySQL TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, [98, 121, 116, 101, 115, 0, 0, 0, 0, 0], [109, 111, 114, 101, 32, 98, 121, 116, 101, 115], [84, 73, 78, 89, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [77, 69, 68, 73, 85, 77, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [76, 79, 78, 71, 66, 76, 79, 66, 32, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], {\"a\": \"b\"}, value1, 2023, 36803000, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}, [a, b]]", "+I[2, 2.2, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 50000000000, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL]");
        this.waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
        try {
            statement.executeUpdate("USE paimon_sync_table");
            statement.executeUpdate("ALTER TABLE all_types_table ADD COLUMN v INT");
            ArrayList<DataField> newFields = new ArrayList<DataField>(rowType.getFields());
            newFields.add(new DataField(rowType.getFieldCount(), "v", (DataType)DataTypes.INT()));
            RowType newRowType = new RowType(newFields);
            List<String> newExpected = expected.stream().map(s -> s.substring(0, s.length() - 1) + ", NULL]").collect(Collectors.toList());
            this.waitForResult(newExpected, table, newRowType, Arrays.asList("pt", "_id"));
        }
        catch (Throwable throwable) {
            statement.executeUpdate("ALTER TABLE all_types_table DROP COLUMN v");
            SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
            schemaManager.commitChanges(new SchemaChange[]{SchemaChange.dropColumn((String)"v")});
            throw throwable;
        }
        statement.executeUpdate("ALTER TABLE all_types_table DROP COLUMN v");
        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.dropColumn((String)"v")});
    }

    @Test
    public void testIncompatibleMySqlTable() {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", DATABASE_NAME);
        mySqlConfig.put("table-name", "incompatible_field_\\d+");
        MySqlSyncTableAction action = this.syncTableActionBuilder(mySqlConfig).build();
        Assertions.assertThatThrownBy(() -> ((MySqlSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, (String)"Column v1 have different types when merging schemas.\nCurrent table '{paimon_sync_table.incompatible_field_2}' field: `v1` INT ''\nTo be merged table 'paimon_sync_table.incompatible_field_1' field: `v1` TIMESTAMP(0) ''")});
    }

    @Test
    public void testIncompatiblePaimonTable() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", DATABASE_NAME);
        mySqlConfig.put("table-name", "incompatible_pk_\\d+");
        this.createFileStoreTable(RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.BIGINT(), DataTypes.DOUBLE()}, (String[])new String[]{"a", "b", "c"}), Collections.emptyList(), Collections.singletonList("a"), new HashMap());
        MySqlSyncTableAction action = (MySqlSyncTableAction)this.syncTableActionBuilder(mySqlConfig).withPrimaryKeys("a").build();
        Assertions.assertThatThrownBy(() -> ((MySqlSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, (String)"Paimon schema and source table schema are not compatible.")});
    }

    @Test
    public void testInvalidPrimaryKey() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", DATABASE_NAME);
        mySqlConfig.put("table-name", "schema_evolution_\\d+");
        MySqlSyncTableAction action = (MySqlSyncTableAction)this.syncTableActionBuilder(mySqlConfig).withPrimaryKeys("pk").build();
        Assertions.assertThatThrownBy(() -> ((MySqlSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, (String)"Specified primary key 'pk' does not exist in source tables or computed columns.")});
    }

    @Test
    public void testNoPrimaryKey() {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", DATABASE_NAME);
        mySqlConfig.put("table-name", "incompatible_pk_\\d+");
        MySqlSyncTableAction action = this.syncTableActionBuilder(mySqlConfig).build();
        Assertions.assertThatThrownBy(() -> ((MySqlSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, (String)"Primary keys are not specified. Also, can't infer primary keys from source table schemas because source tables have no primary keys or have different primary keys.")});
    }

    @Test
    @Timeout(value=60L)
    public void testComputedColumn() throws Exception {
        for (int i = 0; i < 2; ++i) {
            this.innerTestComputedColumn(i == 0);
        }
    }

    private void innerTestComputedColumn(boolean executeMysql) throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", DATABASE_NAME);
        mySqlConfig.put("table-name", "test_computed_column");
        List<String> computedColumnDefs = Arrays.asList("_year_date=year(_date)", "_year_datetime=year(_datetime)", "_year_timestamp=year(_timestamp)", "_month_date=month(_date)", "_month_datetime=month(_datetime)", "_month_timestamp=month(_timestamp)", "_day_date=day(_date)", "_day_datetime=day(_datetime)", "_day_timestamp=day(_timestamp)", "_hour_date=hour(_date)", "_hour_datetime=hour(_datetime)", "_hour_timestamp=hour(_timestamp)", "_date_format_date=date_format(_date,yyyy)", "_date_format_datetime=date_format(_datetime,yyyy-MM-dd)", "_date_format_timestamp=date_format(_timestamp,yyyyMMdd)", "_substring_date1=substring(_date,2)", "_substring_date2=substring(_timestamp,5,10)", "_truncate_date=trUNcate(pk,2)");
        MySqlSyncTableAction action = (MySqlSyncTableAction)this.syncTableActionBuilder(mySqlConfig).withPartitionKeys("_year_date").withPrimaryKeys("pk", "_year_date").withComputedColumnArgs(computedColumnDefs).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        if (executeMysql) {
            try (Statement statement = this.getStatement();){
                statement.execute("USE paimon_sync_table");
                statement.executeUpdate("INSERT INTO test_computed_column VALUES (1, '2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10')");
                statement.executeUpdate("INSERT INTO test_computed_column VALUES (2, '2023-03-23', null, null)");
            }
        }
        FileStoreTable table = this.getFileStoreTable();
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)0), DataTypes.INT().notNull(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT().notNull()}, (String[])new String[]{"pk", "_date", "_datetime", "_timestamp", "_year_date", "_year_datetime", "_year_timestamp", "_month_date", "_month_datetime", "_month_timestamp", "_day_date", "_day_datetime", "_day_timestamp", "_hour_date", "_hour_datetime", "_hour_timestamp", "_date_format_date", "_date_format_datetime", "_date_format_timestamp", "_substring_date1", "_substring_date2", "_truncate_date"});
        List<String> expected = Arrays.asList("+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0]", "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2]");
        this.waitForResult(expected, table, rowType, Arrays.asList("pk", "_year_date"));
    }

    @Test
    @Timeout(value=60L)
    public void testSyncShards() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        ThreadLocalRandom random = ThreadLocalRandom.current();
        String dbPattern = random.nextBoolean() ? "shard_.+" : "shard_1|shard_2";
        String tblPattern = random.nextBoolean() ? "t.+" : "t1|t2";
        mySqlConfig.put("database-name", dbPattern);
        mySqlConfig.put("table-name", tblPattern);
        MySqlSyncTableAction action = (MySqlSyncTableAction)this.syncTableActionBuilder(mySqlConfig).withPartitionKeys("pt").withPrimaryKeys("pk", "pt").withComputedColumnArgs("pt=substring(_date,5)").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            statement.execute("USE shard_1");
            statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30')");
            statement.executeUpdate("INSERT INTO t2 VALUES (2, '2023-07-30')");
            statement.execute("USE shard_2");
            statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31')");
            statement.executeUpdate("INSERT INTO t1 VALUES (4, '2023-07-31')");
        }
        FileStoreTable table = this.getFileStoreTable();
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.STRING().notNull()}, (String[])new String[]{"pk", "_date", "pt"});
        this.waitForResult(Arrays.asList("+I[1, 2023-07-30, 07-30]", "+I[2, 2023-07-30, 07-30]", "+I[3, 2023-07-31, 07-31]", "+I[4, 2023-07-31, 07-31]"), table, rowType, Arrays.asList("pk", "pt"));
    }

    @Test
    @Timeout(value=60L)
    public void testOptionsChange() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", DATABASE_NAME);
        mySqlConfig.put("table-name", "test_options_change");
        HashMap<String, String> tableConfig = new HashMap<String, String>();
        tableConfig.put("bucket", "1");
        tableConfig.put("sink.parallelism", "1");
        MySqlSyncTableAction action1 = (MySqlSyncTableAction)this.syncTableActionBuilder(mySqlConfig).withPartitionKeys("pt").withPrimaryKeys("pk", "pt").withComputedColumnArgs("pt=substring(_date,5)").withTableConfig(tableConfig).build();
        JobClient jobClient = this.runActionWithDefaultEnv((ActionBase)action1);
        try (Statement statement = this.getStatement();){
            statement.execute("USE paimon_sync_table");
            statement.executeUpdate("INSERT INTO test_options_change VALUES (1, '2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10')");
            statement.executeUpdate("INSERT INTO test_options_change VALUES (2, '2023-03-23', null, null)");
        }
        this.waitingTables(this.tableName);
        jobClient.cancel();
        tableConfig.put("sink.savepoint.auto-tag", "true");
        tableConfig.put("tag.num-retained-max", "5");
        tableConfig.put("tag.automatic-creation", "process-time");
        tableConfig.put("tag.creation-period", "hourly");
        tableConfig.put("tag.creation-delay", "600000");
        tableConfig.put("snapshot.time-retained", "1h");
        tableConfig.put("snapshot.num-retained.min", "5");
        tableConfig.put("snapshot.num-retained.max", "10");
        tableConfig.put("changelog-producer", "input");
        MySqlSyncTableAction action2 = (MySqlSyncTableAction)this.syncTableActionBuilder(mySqlConfig).withPartitionKeys("pt").withPrimaryKeys("pk", "pt").withComputedColumnArgs("pt=substring(_date,5)").withTableConfig(tableConfig).build();
        this.runActionWithDefaultEnv((ActionBase)action2);
        Map dynamicOptions = action2.fileStoreTable().options();
        Assertions.assertThat((Map)dynamicOptions).containsAllEntriesOf(tableConfig);
    }

    @Test
    @Timeout(value=60L)
    public void testMetadataColumns() throws Exception {
        try (Statement statement = this.getStatement();){
            statement.execute("USE metadata");
            statement.executeUpdate("INSERT INTO test_metadata_columns VALUES (1, '2023-07-30')");
            statement.executeUpdate("INSERT INTO test_metadata_columns VALUES (2, '2023-07-30')");
        }
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "metadata");
        mySqlConfig.put("table-name", "test_metadata_columns");
        MySqlSyncTableAction action = (MySqlSyncTableAction)this.syncTableActionBuilder(mySqlConfig).withPrimaryKeys("pk").withMetadataColumns("table_name", "database_name", "op_ts").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable();
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.STRING().notNull(), DataTypes.STRING().notNull(), DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3).notNull()}, (String[])new String[]{"pk", "_date", "table_name", "database_name", "op_ts"});
        this.waitForResult(Arrays.asList("+I[1, 2023-07-30, test_metadata_columns, metadata, 1970-01-01T00:00]", "+I[2, 2023-07-30, test_metadata_columns, metadata, 1970-01-01T00:00]"), table, rowType, Collections.singletonList("pk"));
    }

    @Test
    public void testCatalogAndTableConfig() {
        MySqlSyncTableAction action = (MySqlSyncTableAction)this.syncTableActionBuilder(this.getBasicMySqlConfig()).withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value")).withTableConfig(Collections.singletonMap("table-key", "table-value")).build();
        Assertions.assertThat((Map)action.catalogConfig()).containsEntry((Object)"catalog-key", (Object)"catalog-value");
        Assertions.assertThat((Map)action.tableConfig()).containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
    }

    private FileStoreTable getFileStoreTable() throws Exception {
        return this.getFileStoreTable(this.tableName);
    }
}

