/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mysql;

import java.nio.file.Path;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.mysql.MySqlActionITCaseBase;
import org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseAction;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

public class MySqlSyncDatabaseActionITCase
extends MySqlActionITCaseBase {
    @TempDir
    Path tempDir;

    @BeforeAll
    public static void startContainers() {
        MYSQL_CONTAINER.withSetupSQL("mysql/sync_database_setup.sql");
        MySqlSyncDatabaseActionITCase.start();
    }

    @Test
    @Timeout(value=60L)
    public void testSchemaEvolution() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "paimon_sync_database");
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            this.testSchemaEvolutionImpl(statement);
        }
    }

    private void testSchemaEvolutionImpl(Statement statement) throws Exception {
        FileStoreTable table1 = this.getFileStoreTable("t1");
        FileStoreTable table2 = this.getFileStoreTable("t2");
        statement.executeUpdate("USE paimon_sync_database");
        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20, 200)");
        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 400)");
        statement.executeUpdate("INSERT INTO t3 VALUES (-1)");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
        List<String> primaryKeys1 = Collections.singletonList("k");
        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"k1", "k2", "v1", "v2"});
        List<String> primaryKeys2 = Arrays.asList("k1", "k2");
        expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v2 INT");
        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v3 VARCHAR(10)");
        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 60, 600, 'string_6')");
        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 80, 800, 'string_8')");
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"k", "v1", "v2"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k1", "k2", "v1", "v2", "v3"});
        expected = Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v2 BIGINT");
        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 9000000000000)");
        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v3 VARCHAR(20)");
        statement.executeUpdate("INSERT INTO t2 VALUES (10, 'ten', 100, 1000, 'long_long_string_10')");
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.BIGINT()}, (String[])new String[]{"k", "v1", "v2"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR((int)20)}, (String[])new String[]{"k1", "k2", "v1", "v2", "v3"});
        expected = Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]", "+I[10, ten, 100, 1000, long_long_string_10]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
    }

    @Test
    public void testSpecifiedMySqlTable() {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "paimon_sync_database");
        mySqlConfig.put("table-name", "my_table");
        MySqlSyncDatabaseAction action = this.syncDatabaseActionBuilder(mySqlConfig).build();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((MySqlSyncDatabaseAction)action).run()).isInstanceOf(IllegalArgumentException.class)).hasMessage("table-name cannot be set for mysql-sync-database. If you want to sync several MySQL tables into one Paimon table, use mysql-sync-table instead.");
    }

    @Test
    public void testInvalidDatabase() {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "invalid");
        MySqlSyncDatabaseAction action = this.syncDatabaseActionBuilder(mySqlConfig).build();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((MySqlSyncDatabaseAction)action).run()).isInstanceOf(IllegalArgumentException.class)).hasMessage("No tables found in MySQL database invalid, or MySQL database does not exist.");
    }

    @Test
    @Timeout(value=60L)
    public void testIgnoreIncompatibleTables() throws Exception {
        this.createFileStoreTable("incompatible", RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"k", "v1"}), Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap());
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "paimon_sync_database_ignore_incompatible");
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(this.getBasicTableConfig()).ignoreIncompatible(true).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            FileStoreTable table = this.getFileStoreTable("compatible");
            statement.executeUpdate("USE paimon_sync_database_ignore_incompatible");
            statement.executeUpdate("INSERT INTO compatible VALUES (2, 'two', 20, 200)");
            statement.executeUpdate("INSERT INTO compatible VALUES (4, 'four', 40, 400)");
            RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"k1", "k2", "v1", "v2"});
            List<String> primaryKeys2 = Arrays.asList("k1", "k2");
            List<String> expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]");
            this.waitForResult(expected, table, rowType, primaryKeys2);
        }
    }

    @Test
    @Timeout(value=60L)
    public void testTableAffix() throws Exception {
        this.createFileStoreTable("test_prefix_t1_test_suffix", RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k1", "v0"}), Collections.emptyList(), Collections.singletonList("k1"), Collections.emptyMap());
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "paimon_sync_database_affix");
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(this.getBasicTableConfig()).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            this.testTableAffixImpl(statement);
        }
    }

    private void testTableAffixImpl(Statement statement) throws Exception {
        FileStoreTable table1 = this.getFileStoreTable("test_prefix_t1_test_suffix");
        FileStoreTable table2 = this.getFileStoreTable("test_prefix_t2_test_suffix");
        statement.executeUpdate("USE paimon_sync_database_affix");
        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two')");
        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four')");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k1", "v0"});
        List<String> primaryKeys1 = Collections.singletonList("k1");
        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k2", "v0"});
        List<String> primaryKeys2 = Collections.singletonList("k2");
        expected = Arrays.asList("+I[2, two]", "+I[4, four]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v1 INT");
        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v1 VARCHAR(10)");
        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 's_6')");
        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 's_8')");
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"k1", "v0", "v1"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k2", "v0", "v1"});
        expected = Arrays.asList("+I[2, two, NULL]", "+I[4, four, NULL]", "+I[6, six, s_6]", "+I[8, eight, s_8]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v1 BIGINT");
        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 9000000000000)");
        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v1 VARCHAR(20)");
        statement.executeUpdate("INSERT INTO t2 VALUES (10, 'ten', 'long_s_10')");
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.BIGINT()}, (String[])new String[]{"k1", "v0", "v1"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.VARCHAR((int)20)}, (String[])new String[]{"k2", "v0", "v1"});
        expected = Arrays.asList("+I[2, two, NULL]", "+I[4, four, NULL]", "+I[6, six, s_6]", "+I[8, eight, s_8]", "+I[10, ten, long_s_10]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
    }

    @Test
    @Timeout(value=60L)
    public void testIncludingTables() throws Exception {
        this.includingAndExcludingTablesImpl("paimon_sync_database_including", "flink|paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignored"));
    }

    @Test
    @Timeout(value=60L)
    public void testExcludingTables() throws Exception {
        this.includingAndExcludingTablesImpl("paimon_sync_database_excluding", null, "flink|paimon.+", Collections.singletonList("sync"), Arrays.asList("flink", "paimon_1", "paimon_2"));
    }

    @Test
    @Timeout(value=60L)
    public void testIncludingAndExcludingTables() throws Exception {
        this.includingAndExcludingTablesImpl("paimon_sync_database_in_excluding", "flink|paimon.+", "paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "test"));
    }

    private void includingAndExcludingTablesImpl(String databaseName, @Nullable String includingTables, @Nullable String excludingTables, List<String> existedTables, List<String> notExistedTables) throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", databaseName);
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(this.getBasicTableConfig()).includingTables(includingTables).excludingTables(excludingTables).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.assertExactlyExistTables(existedTables);
        this.assertTableNotExists(notExistedTables);
    }

    @Test
    @Timeout(value=60L)
    public void testIgnoreCase() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "paimon_ignore_CASE");
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withCatalogConfig(Collections.singletonMap(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable("t");
        Assertions.assertThat((String)JsonSerdeUtil.toFlatJson((Object)table.schema().fields())).isEqualTo("[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT NULL\",\"description\":\"\"},{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\",\"description\":\"\"}]");
        try (Statement statement = this.getStatement();){
            statement.executeUpdate("USE paimon_ignore_CASE");
            statement.executeUpdate("INSERT INTO T VALUES (1, 'Hi')");
            RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20)}, (String[])new String[]{"k", "uppercase_v0"});
            this.waitForResult(Collections.singletonList("+I[1, Hi]"), table, rowType1, Collections.singletonList("k"));
            statement.executeUpdate("ALTER TABLE T MODIFY COLUMN UPPERCASE_V0 VARCHAR(30)");
            statement.executeUpdate("INSERT INTO T VALUES (2, 'Paimon')");
            RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)30)}, (String[])new String[]{"k", "uppercase_v0"});
            this.waitForResult(Arrays.asList("+I[1, Hi]", "+I[2, Paimon]"), table, rowType2, Collections.singletonList("k"));
            statement.executeUpdate("ALTER TABLE T ADD COLUMN UPPERCASE_V1 DOUBLE");
            statement.executeUpdate("INSERT INTO T VALUES (3, 'Test', 0.5)");
            RowType rowType3 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)30), DataTypes.DOUBLE()}, (String[])new String[]{"k", "uppercase_v0", "uppercase_v1"});
            this.waitForResult(Arrays.asList("+I[1, Hi, NULL]", "+I[2, Paimon, NULL]", "+I[3, Test, 0.5]"), table, rowType3, Collections.singletonList("k"));
        }
    }

    @Test
    @Timeout(value=600L)
    public void testNewlyAddedTables() throws Exception {
        this.testNewlyAddedTable(1, true, false, "paimon_sync_database_newly_added_tables");
    }

    @Test
    @Timeout(value=600L)
    public void testNewlyAddedTableSingleTable() throws Exception {
        this.testNewlyAddedTable(1, false, false, "paimon_sync_database_newly_added_tables_1");
    }

    @Test
    @Timeout(value=600L)
    public void testNewlyAddedTableMultipleTables() throws Exception {
        this.testNewlyAddedTable(3, false, false, "paimon_sync_database_newly_added_tables_2");
    }

    @Test
    @Timeout(value=600L)
    public void testNewlyAddedTableSchemaChange() throws Exception {
        this.testNewlyAddedTable(1, false, true, "paimon_sync_database_newly_added_tables_3");
    }

    @Test
    @Timeout(value=600L)
    public void testNewlyAddedTableSingleTableWithSavepoint() throws Exception {
        this.testNewlyAddedTable(1, true, true, "paimon_sync_database_newly_added_tables_4");
    }

    @Test
    @Timeout(value=120L)
    public void testAddIgnoredTable() throws Exception {
        String mySqlDatabase = "paimon_sync_database_add_ignored_table";
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", mySqlDatabase);
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(this.getBasicTableConfig()).includingTables("t.+").excludingTables(".*a$").withMode(MultiTablesSinkMode.COMBINED.configString()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            FileStoreTable table1 = this.getFileStoreTable("t1");
            statement.executeUpdate("USE " + mySqlDatabase);
            statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
            statement.executeUpdate("INSERT INTO a VALUES (1, 'one')");
            RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
            List<String> primaryKeys = Collections.singletonList("k");
            this.waitForResult(Collections.singletonList("+I[1, one]"), table1, rowType, primaryKeys);
            statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
            statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
            statement.executeUpdate("CREATE TABLE t22 LIKE t2");
            statement.executeUpdate("INSERT INTO t22 VALUES (1, 'Hello')");
            statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
            statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')");
            statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))");
            statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')");
            statement.executeUpdate("CREATE TABLE t4 SELECT * FROM t2");
            statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')");
            this.waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), table1, rowType, primaryKeys);
            this.assertExactlyExistTables("t1", "t2", "t22");
            this.assertTableNotExists("a", "ta", "t3", "t4");
            FileStoreTable newTable = this.getFileStoreTable("t2");
            this.waitForResult(Collections.singletonList("+I[1, Hi]"), newTable, rowType, primaryKeys);
            newTable = this.getFileStoreTable("t22");
            this.waitForResult(Collections.singletonList("+I[1, Hello]"), newTable, rowType, primaryKeys);
        }
    }

    public void testNewlyAddedTable(int numOfNewlyAddedTables, boolean testSavepointRecovery, boolean testSchemaChange, String databaseName) throws Exception {
        JobClient client = this.buildSyncDatabaseActionWithNewlyAddedTables(databaseName, testSchemaChange);
        this.waitJobRunning(client);
        try (Statement statement = this.getStatement();){
            this.testNewlyAddedTableImpl(client, statement, numOfNewlyAddedTables, testSavepointRecovery, testSchemaChange, databaseName);
        }
    }

    private void testNewlyAddedTableImpl(JobClient client, Statement statement, int newlyAddedTableCount, boolean testSavepointRecovery, boolean testSchemaChange, String databaseName) throws Exception {
        FileStoreTable table1 = this.getFileStoreTable("t1");
        FileStoreTable table2 = this.getFileStoreTable("t2");
        statement.executeUpdate("USE " + databaseName);
        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20, 200)");
        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 400)");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
        List<String> primaryKeys1 = Collections.singletonList("k");
        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"k1", "k2", "v1", "v2"});
        List<String> primaryKeys2 = Arrays.asList("k1", "k2");
        expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        HashMap<String, List<Tuple2<Integer, String>>> recordsMap = new HashMap<String, List<Tuple2<Integer, String>>>();
        List<String> newTablePrimaryKeys = Collections.singletonList("k");
        RowType newTableRowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
        int newTableCount = 0;
        String newTableName = this.getNewTableName(newTableCount);
        this.createNewTable(statement, newTableName);
        statement.executeUpdate(String.format("INSERT INTO `%s`.`t2` VALUES (8, 'eight', 80, 800)", databaseName));
        List<Tuple2<Integer, String>> newTableRecords = this.getNewTableRecords();
        recordsMap.put(newTableName, newTableRecords);
        List<String> newTableExpected = this.getNewTableExpected(newTableRecords);
        this.insertRecordsIntoNewTable(statement, databaseName, newTableName, newTableRecords);
        if (testSavepointRecovery) {
            String savepoint = (String)client.stopWithSavepoint(false, this.tempDir.toUri().toString(), SavepointFormatType.CANONICAL).join();
            Assertions.assertThat((String)savepoint).isNotBlank();
            client = this.buildSyncDatabaseActionWithNewlyAddedTables(savepoint, databaseName, testSchemaChange);
            this.waitJobRunning(client);
        }
        expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]", "+I[8, eight, 80, 800]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        FileStoreTable newTable = this.getFileStoreTable(newTableName);
        this.waitForResult(newTableExpected, newTable, newTableRowType, newTablePrimaryKeys);
        for (newTableCount = 1; newTableCount < newlyAddedTableCount; ++newTableCount) {
            newTableName = this.getNewTableName(newTableCount);
            this.createNewTable(statement, newTableName);
            Thread.sleep(5000L);
            newTableRecords = this.getNewTableRecords();
            recordsMap.put(newTableName, newTableRecords);
            this.insertRecordsIntoNewTable(statement, databaseName, newTableName, newTableRecords);
            newTable = this.getFileStoreTable(newTableName);
            newTableExpected = this.getNewTableExpected(newTableRecords);
            this.waitForResult(newTableExpected, newTable, newTableRowType, newTablePrimaryKeys);
        }
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int pick = random.nextInt(newlyAddedTableCount);
        String tableName = this.getNewTableName(pick);
        List records = (List)recordsMap.get(tableName);
        records.add(Tuple2.of((Object)80, (Object)"eighty"));
        newTable = this.getFileStoreTable(tableName);
        newTableExpected = this.getNewTableExpected(records);
        statement.executeUpdate(String.format("INSERT INTO `%s`.`%s` VALUES (80, 'eighty')", databaseName, tableName));
        this.waitForResult(newTableExpected, newTable, newTableRowType, newTablePrimaryKeys);
        if (testSchemaChange) {
            pick = random.nextInt(newlyAddedTableCount);
            tableName = this.getNewTableName(pick);
            records = (List)recordsMap.get(tableName);
            statement.executeUpdate(String.format("ALTER TABLE `%s`.`%s` ADD COLUMN v2 INT", databaseName, tableName));
            statement.executeUpdate(String.format("INSERT INTO `%s`.`%s` VALUES (100, 'hundred', 10000)", databaseName, tableName));
            List<String> expectedRecords = records.stream().map(tuple -> String.format("+I[%d, %s, NULL]", tuple.f0, tuple.f1)).collect(Collectors.toList());
            expectedRecords.add("+I[100, hundred, 10000]");
            newTable = this.getFileStoreTable(tableName);
            RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"k", "v1", "v2"});
            this.waitForResult(expectedRecords, newTable, rowType, newTablePrimaryKeys);
            Assertions.assertThat((Map)this.getFileStoreTable(tableName).options()).containsEntry((Object)"alter-table-test", (Object)"true");
        }
    }

    private List<String> getNewTableExpected(List<Tuple2<Integer, String>> newTableRecords) {
        return newTableRecords.stream().map(tuple -> String.format("+I[%d, %s]", tuple.f0, tuple.f1)).collect(Collectors.toList());
    }

    private List<Tuple2<Integer, String>> getNewTableRecords() {
        LinkedList<Tuple2<Integer, String>> records = new LinkedList<Tuple2<Integer, String>>();
        int count = ThreadLocalRandom.current().nextInt(10) + 1;
        for (int i = 0; i < count; ++i) {
            records.add((Tuple2<Integer, String>)Tuple2.of((Object)i, (Object)("varchar_" + i)));
        }
        return records;
    }

    private void insertRecordsIntoNewTable(Statement statement, String databaseName, String newTableName, List<Tuple2<Integer, String>> newTableRecords) throws SQLException {
        String sql = String.format("INSERT INTO `%s`.`%s` VALUES %s", databaseName, newTableName, newTableRecords.stream().map(tuple -> String.format("(%d, '%s')", tuple.f0, tuple.f1)).collect(Collectors.joining(", ")));
        statement.executeUpdate(sql);
    }

    private String getNewTableName(int newTableCount) {
        return "t_new_table_" + newTableCount;
    }

    private void createNewTable(Statement statement, String newTableName) throws SQLException {
        statement.executeUpdate(String.format("CREATE TABLE %s (k INT, v1 VARCHAR(10), PRIMARY KEY (k))", newTableName));
    }

    private JobClient buildSyncDatabaseActionWithNewlyAddedTables(String databaseName, boolean testSchemaChange) throws Exception {
        return this.buildSyncDatabaseActionWithNewlyAddedTables(null, databaseName, testSchemaChange);
    }

    private JobClient buildSyncDatabaseActionWithNewlyAddedTables(String savepointPath, String databaseName, boolean testSchemaChange) throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", databaseName);
        mySqlConfig.put("scan.incremental.snapshot.chunk.size", "1");
        Map<String, String> catalogConfig = testSchemaChange ? Collections.singletonMap(CatalogOptions.METASTORE.key(), "test-alter-table") : Collections.emptyMap();
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withCatalogConfig(catalogConfig).withTableConfig(this.getBasicTableConfig()).includingTables("t.+").withMode(MultiTablesSinkMode.COMBINED.configString()).build();
        action.withStreamExecutionEnvironment(this.env).build();
        if (Objects.nonNull(savepointPath)) {
            StreamGraph streamGraph = this.env.getStreamGraph();
            JobGraph jobGraph = streamGraph.getJobGraph();
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath, (boolean)true));
            return this.env.executeAsync(streamGraph);
        }
        return this.env.executeAsync();
    }

    @Test
    @Timeout(value=240L)
    public void testSyncManyTableWithLimitedMemory() throws Exception {
        String databaseName = "many_table_sync_test";
        int newTableCount = 100;
        int recordsCount = 100;
        ArrayList<Tuple2<Integer, String>> newTableRecords = new ArrayList<Tuple2<Integer, String>>();
        ArrayList<String> expectedRecords = new ArrayList<String>();
        for (int i = 0; i < recordsCount; ++i) {
            newTableRecords.add(Tuple2.of((Object)i, (Object)("string_" + i)));
            expectedRecords.add(String.format("+I[%d, %s]", i, "string_" + i));
        }
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", databaseName);
        mySqlConfig.put("scan.incremental.snapshot.chunk.size", "1");
        Map<String, String> tableConfig = this.getBasicTableConfig();
        tableConfig.put("sink.parallelism", "1");
        tableConfig.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "4 mb");
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(tableConfig).withMode(MultiTablesSinkMode.COMBINED.configString()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            statement.executeUpdate("USE " + databaseName);
            Thread.sleep(2000L);
            ArrayList<String> tables = new ArrayList<String>();
            tables.add("a");
            for (int i = 0; i < newTableCount; ++i) {
                tables.add("t" + i);
                Thread thread = new Thread(new SyncNewTableJob(i, statement, newTableRecords));
                thread.start();
            }
            this.waitingTables(tables);
            RowType newTableRowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
            List<String> newTablePrimaryKeys = Collections.singletonList("k");
            for (int i = 0; i < newTableCount; ++i) {
                FileStoreTable newTable = this.getFileStoreTable("t" + i);
                this.waitForResult(expectedRecords, newTable, newTableRowType, newTablePrimaryKeys);
            }
        }
    }

    @Test
    @Timeout(value=60L)
    public void testSyncMultipleShards() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", ThreadLocalRandom.current().nextBoolean() ? "database_shard_.*" : "database_shard_1|database_shard_2");
        MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? MultiTablesSinkMode.DIVIDED : MultiTablesSinkMode.COMBINED;
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(this.getBasicTableConfig()).withMode(mode.configString()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES (1, 'db1_1')");
            statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES (2, 'db1_2')");
            statement.executeUpdate("INSERT INTO database_shard_2.t1 VALUES (3, 'db2_3', 300)");
            statement.executeUpdate("INSERT INTO database_shard_2.t1 VALUES (4, 'db2_4', 400)");
            FileStoreTable table = this.getFileStoreTable("t1");
            RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT()}, (String[])new String[]{"k", "v1", "v2"});
            this.waitForResult(Arrays.asList("+I[1, db1_1, NULL]", "+I[2, db1_2, NULL]", "+I[3, db2_3, 300]", "+I[4, db2_4, 400]"), table, rowType, Collections.singletonList("k"));
            statement.executeUpdate("ALTER TABLE database_shard_1.t2 ADD COLUMN v2 INT");
            statement.executeUpdate("ALTER TABLE database_shard_2.t2 ADD COLUMN v3 VARCHAR(10)");
            statement.executeUpdate("INSERT INTO database_shard_1.t2 VALUES (1, 1.1, 1)");
            statement.executeUpdate("INSERT INTO database_shard_1.t2 VALUES (2, 2.2, 2)");
            statement.executeUpdate("INSERT INTO database_shard_2.t2 VALUES (3, 3.3, 'db2_3')");
            statement.executeUpdate("INSERT INTO database_shard_2.t2 VALUES (4, 4.4, 'db2_4')");
            table = this.getFileStoreTable("t2");
            rowType = RowType.of((DataType[])new DataType[]{DataTypes.BIGINT().notNull(), DataTypes.DOUBLE(), DataTypes.INT(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1", "v2", "v3"});
            this.waitForResult(Arrays.asList("+I[1, 1.1, 1, NULL]", "+I[2, 2.2, 2, NULL]", "+I[3, 3.3, NULL, db2_3]", "+I[4, 4.4, NULL, db2_4]"), table, rowType, Collections.singletonList("k"));
            statement.executeUpdate("INSERT INTO database_shard_2.t3 VALUES (1, 'db2_1'), (2, 'db2_2')");
            statement.executeUpdate("INSERT INTO database_shard_1.t3 VALUES (3, 'db1_3'), (4, 'db1_4')");
            table = this.getFileStoreTable("t3");
            rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
            this.waitForResult(Arrays.asList("+I[3, db1_3]", "+I[4, db1_4]"), table, rowType, Collections.singletonList("k"));
            if (mode == MultiTablesSinkMode.COMBINED) {
                statement.executeUpdate("CREATE TABLE database_shard_1.t4 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                statement.executeUpdate("INSERT INTO database_shard_1.t4 VALUES (1, 'db1_1')");
                statement.executeUpdate("CREATE TABLE database_shard_2.t4 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                statement.executeUpdate("INSERT INTO database_shard_2.t4 VALUES (2, 'db2_2')");
                this.waitingTables("t4");
                table = this.getFileStoreTable("t4");
                rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
                this.waitForResult(Arrays.asList("+I[1, db1_1]", "+I[2, db2_2]"), table, rowType, Collections.singletonList("k"));
            }
        }
    }

    @Test
    @Timeout(value=60L)
    public void testSyncMultipleShardsWithoutMerging() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "without_merging_shard_.*");
        MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? MultiTablesSinkMode.DIVIDED : MultiTablesSinkMode.COMBINED;
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(this.getBasicTableConfig()).mergeShards(false).withMode(mode.configString()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            Thread.sleep(5000L);
            this.assertExactlyExistTables("without_merging_shard_1_t1", "without_merging_shard_1_t2", "without_merging_shard_2_t1");
            statement.executeUpdate("INSERT INTO without_merging_shard_1.t1 VALUES (1, 'db1_1'), (2, 'db1_2')");
            FileStoreTable table = this.getFileStoreTable("without_merging_shard_1_t1");
            RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
            this.waitForResult(Arrays.asList("+I[1, db1_1]", "+I[2, db1_2]"), table, rowType, Collections.singletonList("k"));
            statement.executeUpdate("INSERT INTO without_merging_shard_2.t1 VALUES (3, 'db2_3', 300), (4, 'db2_4', 400)");
            table = this.getFileStoreTable("without_merging_shard_2_t1");
            rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT()}, (String[])new String[]{"k", "v1", "v2"});
            this.waitForResult(Arrays.asList("+I[3, db2_3, 300]", "+I[4, db2_4, 400]"), table, rowType, Collections.singletonList("k"));
            statement.executeUpdate("ALTER TABLE without_merging_shard_1.t2 ADD COLUMN v2 DOUBLE");
            statement.executeUpdate("INSERT INTO without_merging_shard_1.t2 VALUES (1, 'Apache', 1.1)");
            statement.executeUpdate("INSERT INTO without_merging_shard_1.t2 VALUES (2, 'Paimon', 2.2)");
            table = this.getFileStoreTable("without_merging_shard_1_t2");
            rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.DOUBLE()}, (String[])new String[]{"k", "v1", "v2"});
            this.waitForResult(Arrays.asList("+I[1, Apache, 1.1]", "+I[2, Paimon, 2.2]"), table, rowType, Collections.singletonList("k"));
            if (mode == MultiTablesSinkMode.COMBINED) {
                statement.executeUpdate("CREATE TABLE without_merging_shard_1.t3 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                statement.executeUpdate("INSERT INTO without_merging_shard_1.t3 VALUES (1, 'test')");
                statement.executeUpdate("CREATE TABLE without_merging_shard_2.t3 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                statement.executeUpdate("INSERT INTO without_merging_shard_2.t3 VALUES (2, 'test')");
                this.waitingTables("without_merging_shard_1_t3", "without_merging_shard_2_t3");
                table = this.getFileStoreTable("without_merging_shard_1_t3");
                rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
                this.waitForResult(Collections.singletonList("+I[1, test]"), table, rowType, Collections.singletonList("k"));
                table = this.getFileStoreTable("without_merging_shard_2_t3");
                this.waitForResult(Collections.singletonList("+I[2, test]"), table, rowType, Collections.singletonList("k"));
            }
        }
    }

    @Test
    public void testMonitoredAndExcludedTablesWithMering() throws Exception {
        this.createFileStoreTable("t2", RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"k", "v1"}), Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap());
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "monitored_and_excluded_shard_.*");
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).ignoreIncompatible(true).withMode(MultiTablesSinkMode.COMBINED.configString()).build();
        action.build();
        Assertions.assertThat((List)action.monitoredTables()).containsOnly((Object[])new Identifier[]{Identifier.create((String)"monitored_and_excluded_shard_1", (String)"t1"), Identifier.create((String)"monitored_and_excluded_shard_1", (String)"t3"), Identifier.create((String)"monitored_and_excluded_shard_2", (String)"t1")});
        Assertions.assertThat((List)action.excludedTables()).containsOnly((Object[])new Identifier[]{Identifier.create((String)"monitored_and_excluded_shard_1", (String)"t2"), Identifier.create((String)"monitored_and_excluded_shard_2", (String)"t2"), Identifier.create((String)"monitored_and_excluded_shard_2", (String)"t3")});
    }

    @Test
    @Timeout(value=60L)
    public void testNewlyAddedTablesOptionsChange() throws Exception {
        try (Statement statement = this.getStatement();){
            statement.execute("USE newly_added_tables_option_schange");
            statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
            statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
        }
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "newly_added_tables_option_schange");
        HashMap<String, String> tableConfig = new HashMap<String, String>();
        tableConfig.put("bucket", "1");
        tableConfig.put("sink.parallelism", "1");
        MySqlSyncDatabaseAction action1 = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(tableConfig).withMode(MultiTablesSinkMode.COMBINED.configString()).build();
        JobClient jobClient = this.runActionWithDefaultEnv((ActionBase)action1);
        this.waitingTables("t1");
        jobClient.cancel();
        tableConfig.put("sink.savepoint.auto-tag", "true");
        tableConfig.put("tag.num-retained-max", "5");
        tableConfig.put("tag.automatic-creation", "process-time");
        tableConfig.put("tag.creation-period", "hourly");
        tableConfig.put("tag.creation-delay", "600000");
        tableConfig.put("snapshot.time-retained", "1h");
        tableConfig.put("snapshot.num-retained.min", "5");
        tableConfig.put("snapshot.num-retained.max", "10");
        tableConfig.put("changelog-producer", "input");
        try (Statement statement = this.getStatement();){
            statement.execute("USE newly_added_tables_option_schange");
            statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
            statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
        }
        MySqlSyncDatabaseAction action2 = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(tableConfig).build();
        this.runActionWithDefaultEnv((ActionBase)action2);
        this.waitingTables("t2");
        Map tableOptions = this.getFileStoreTable("t2").options();
        ((MapAssert)Assertions.assertThat((Map)tableOptions).containsAllEntriesOf(tableConfig)).containsKey((Object)"path");
    }

    @Test
    public void testCatalogAndTableConfig() {
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(this.getBasicMySqlConfig()).withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value")).withTableConfig(Collections.singletonMap("table-key", "table-value")).build();
        Assertions.assertThat((Map)action.catalogConfig()).containsEntry((Object)"catalog-key", (Object)"catalog-value");
        Assertions.assertThat((Map)action.tableConfig()).containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
    }

    @Test
    @Timeout(value=60L)
    public void testMetadataColumns() throws Exception {
        Map<String, String> mySqlConfig = this.getBasicMySqlConfig();
        mySqlConfig.put("database-name", "metadata");
        MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? MultiTablesSinkMode.DIVIDED : MultiTablesSinkMode.COMBINED;
        MySqlSyncDatabaseAction action = (MySqlSyncDatabaseAction)this.syncDatabaseActionBuilder(mySqlConfig).withTableConfig(this.getBasicTableConfig()).withMode(mode.configString()).withMetadataColumn(Arrays.asList("table_name", "database_name")).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement();){
            statement.executeUpdate("INSERT INTO metadata.t1 VALUES (1, 'db1_1')");
            statement.executeUpdate("INSERT INTO metadata.t1 VALUES (2, 'db1_2')");
            statement.executeUpdate("INSERT INTO metadata.t1 VALUES (3, 'db2_3')");
            statement.executeUpdate("INSERT INTO metadata.t1 VALUES (4, 'db2_4')");
            FileStoreTable table = this.getFileStoreTable("t1");
            RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.STRING().notNull(), DataTypes.STRING().notNull()}, (String[])new String[]{"k", "v1", "table_name", "database_name"});
            this.waitForResult(Arrays.asList("+I[1, db1_1, t1, metadata]", "+I[2, db1_2, t1, metadata]", "+I[3, db2_3, t1, metadata]", "+I[4, db2_4, t1, metadata]"), table, rowType, Collections.singletonList("k"));
            statement.executeUpdate("INSERT INTO metadata.t2 VALUES (1, 'db1_1')");
            statement.executeUpdate("INSERT INTO metadata.t2 VALUES (2, 'db1_2')");
            statement.executeUpdate("INSERT INTO metadata.t2 VALUES (3, 'db1_3')");
            statement.executeUpdate("INSERT INTO metadata.t2 VALUES (4, 'db1_4')");
            table = this.getFileStoreTable("t2");
            rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.STRING().notNull(), DataTypes.STRING().notNull()}, (String[])new String[]{"k", "v1", "table_name", "database_name"});
            this.waitForResult(Arrays.asList("+I[1, db1_1, t2, metadata]", "+I[2, db1_2, t2, metadata]", "+I[3, db1_3, t2, metadata]", "+I[4, db1_4, t2, metadata]"), table, rowType, Collections.singletonList("k"));
        }
    }

    private class SyncNewTableJob
    implements Runnable {
        private final int ith;
        private final Statement statement;
        private final List<Tuple2<Integer, String>> records;

        SyncNewTableJob(int ith, Statement statement, List<Tuple2<Integer, String>> records) {
            this.ith = ith;
            this.statement = statement;
            this.records = records;
        }

        @Override
        public void run() {
            String newTableName = "t" + this.ith;
            try {
                MySqlSyncDatabaseActionITCase.this.createNewTable(this.statement, newTableName);
                String sql = String.format("INSERT INTO %s VALUES %s", newTableName, this.records.stream().map(tuple -> String.format("(%d, '%s')", tuple.f0, tuple.f1)).collect(Collectors.joining(", ")));
                this.statement.executeUpdate(sql);
            }
            catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

