/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class KafkaCanalSyncTableActionITCase
extends KafkaActionITCaseBase {
    @Test
    @Timeout(value=60L)
    public void testSchemaEvolution() throws Exception {
        this.runSingleTableSchemaEvolution("schemaevolution");
    }

    @Test
    @Timeout(value=60L)
    public void testSchemaEvolutionWithMissingDdl() throws Exception {
        this.runSingleTableSchemaEvolution("schemaevolutionmissingddl");
    }

    private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
        String topic = "schema_evolution";
        this.createTestTopic("schema_evolution", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines(String.format("kafka/canal/table/%s/canal-data-1.txt", sourceDir));
        try {
            this.writeRecordsToKafka("schema_evolution", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_evolution");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl("schema_evolution", sourceDir);
    }

    private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception {
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"pt", "_id", "v1"});
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaCanalSyncTableActionITCase.readLines(String.format("kafka/canal/table/%s/canal-data-2.txt", sourceDir)));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"pt", "_id", "v1", "v2"});
        expected = Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30]", "+I[2, 4, four, NULL]", "+I[1, 5, five, 50]", "+I[1, 6, six, 60]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaCanalSyncTableActionITCase.readLines(String.format("kafka/canal/table/%s/canal-data-3.txt", sourceDir)));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.BIGINT()}, (String[])new String[]{"pt", "_id", "v1", "v2"});
        expected = Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30000000000]", "+I[2, 4, four, NULL]", "+I[1, 6, six, 60]", "+I[2, 7, seven, 70000000000]", "+I[2, 8, eight, 80000000000]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaCanalSyncTableActionITCase.readLines(String.format("kafka/canal/table/%s/canal-data-4.txt", sourceDir)));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.VARBINARY((int)10), DataTypes.FLOAT()}, (String[])new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
        expected = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, NULL, NULL]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaCanalSyncTableActionITCase.readLines(String.format("kafka/canal/table/%s/canal-data-5.txt", sourceDir)));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.VARBINARY((int)20), DataTypes.DOUBLE()}, (String[])new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
        expected = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testMultipleSchemaEvolutions() throws Exception {
        String topic = "schema_evolution_multiple";
        this.createTestTopic("schema_evolution_multiple", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("schema_evolution_multiple", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_evolution_multiple");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("_id").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionMultipleImpl("schema_evolution_multiple");
    }

    private void testSchemaEvolutionMultipleImpl(String topic) throws Exception {
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"_id", "v1", "v2", "v3"});
        List<String> primaryKeys = Collections.singletonList("_id");
        List<String> expected = Collections.singletonList("+I[1, one, 10, string_1]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-2.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT(), DataTypes.VARCHAR((int)10), DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.DECIMAL((int)5, (int)3), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"_id", "v1", "v2", "v3", "v4", "v5", "v6", "$% ^,& *("});
        expected = Arrays.asList("+I[1, one, 10, string_1, NULL, NULL, NULL, NULL]", "+I[2, long_string_two, 2000000000000, string_2, 20, 20.5, 20.002, test_2]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testAllTypes() throws Exception {
        for (int i = 0; i < 2; ++i) {
            this.testAllTypesOnce();
            Thread.sleep(3000L);
        }
    }

    private void testAllTypesOnce() throws Exception {
        String topic = "all_type" + UUID.randomUUID();
        this.createTestTopic(topic, 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/alltype/canal-data.txt");
        try {
            this.writeRecordsToKafka(topic, lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").build();
        JobClient client = this.runActionWithDefaultEnv((ActionBase)action);
        this.testAllTypesImpl();
        client.cancel().get();
    }

    private void testAllTypesImpl() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DECIMAL((int)2, (int)1).notNull(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DECIMAL((int)20, (int)0), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.DECIMAL((int)8, (int)3), DataTypes.DECIMAL((int)8, (int)3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DATE(), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)2), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP((int)0), DataTypes.CHAR((int)10), DataTypes.VARCHAR((int)20), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.VARBINARY((int)10), DataTypes.VARBINARY((int)20), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT(), DataTypes.TIME(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.ARRAY((DataType)DataTypes.STRING())}, (String[])new String[]{"_id", "pt", "_tinyint1", "_boolean", "_bool", "_tinyint", "_tinyint_unsigned", "_tinyint_unsigned_zerofill", "_smallint", "_smallint_unsigned", "_smallint_unsigned_zerofill", "_mediumint", "_mediumint_unsigned", "_mediumint_unsigned_zerofill", "_int", "_int_unsigned", "_int_unsigned_zerofill", "_bigint", "_bigint_unsigned", "_bigint_unsigned_zerofill", "_serial", "_float", "_float_unsigned", "_float_unsigned_zerofill", "_real", "_real_unsigned", "_real_unsigned_zerofill", "_double", "_double_unsigned", "_double_unsigned_zerofill", "_double_precision", "_double_precision_unsigned", "_double_precision_unsigned_zerofill", "_numeric", "_numeric_unsigned", "_numeric_unsigned_zerofill", "_fixed", "_fixed_unsigned", "_fixed_unsigned_zerofill", "_decimal", "_decimal_unsigned", "_decimal_unsigned_zerofill", "_date", "_datetime", "_datetime3", "_datetime6", "_datetime_p", "_datetime_p2", "_timestamp", "_timestamp0", "_char", "_varchar", "_tinytext", "_text", "_mediumtext", "_longtext", "_bin", "_varbin", "_tinyblob", "_blob", "_mediumblob", "_longblob", "_json", "_enum", "_year", "_time", "_point", "_geometry", "_linestring", "_polygon", "_multipoint", "_multiline", "_multipolygon", "_geometrycollection", "_set"});
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        List<String> expected = Arrays.asList("+I[1, 1.1, true, true, false, 1, 2, 3, 1000, 2000, 3000, 100000, 200000, 300000, 1000000, 2000000, 3000000, 10000000000, 20000000000, 30000000000, 40000000000, 1.5, 2.5, 3.5, 1.000001, 2.000002, 3.000003, 1.000011, 2.000022, 3.000033, 1.000111, 2.000222, 3.000333, 12345.110, 12345.220, 12345.330, 123456789876543212345678987654321.110, 123456789876543212345678987654321.220, 123456789876543212345678987654321.330, 11111, 22222, 33333, 19439, 2023-03-23T14:30:05, 2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, 2023-03-24T14:30, 2023-03-24T14:30:05.120, 2023-03-23T15:00:10.123456, 2023-03-23T00:10, Paimon, Apache Paimon, Apache Paimon MySQL TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, [98, 121, 116, 101, 115], [109, 111, 114, 101, 32, 98, 121, 116, 101, 115], [84, 73, 78, 89, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [77, 69, 68, 73, 85, 77, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [76, 79, 78, 71, 66, 76, 79, 66, 32, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], {\"a\": \"b\"}, value1, 2023, 36803000, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}, [a, b]]", "+I[2, 2.2, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 50000000000, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL]");
        this.waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
    }

    @Test
    @Timeout(value=60L)
    public void testNotSupportFormat() throws Exception {
        String topic = "not_support";
        this.createTestTopic("not_support", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/schemaevolution/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("not_support", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "togg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "not_support");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(UnsupportedOperationException.class, (String)"This format: togg-json is not supported.")});
    }

    @Test
    @Timeout(value=120L)
    public void testKafkaNoNonDdlData() throws Exception {
        String topic = "no_non_ddl_data";
        this.createTestTopic("no_non_ddl_data", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/nononddldata/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("no_non_ddl_data", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "no_non_ddl_data");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(Exception.class, (String)"Could not get metadata from server, topic: no_non_ddl_data")});
    }

    @Test
    @Timeout(value=60L)
    public void testAssertSchemaCompatible() throws Exception {
        String topic = "assert_schema_compatible";
        this.createTestTopic("assert_schema_compatible", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/schemaevolution/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("assert_schema_compatible", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "assert_schema_compatible");
        this.createFileStoreTable(RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"k", "v1"}), Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, (String)"Paimon schema and source table schema are not compatible.\nPaimon fields are: [`k` STRING NOT NULL, `v1` STRING].\nSource table fields are: [`pt` INT NOT NULL, `_id` INT NOT NULL, `v1` VARCHAR(10)]")});
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionSpecific() throws Exception {
        String topic = "start_up_specific";
        this.createTestTopic("start_up_specific", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/startupmode/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_specific", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_specific");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS.toString());
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key(), "partition:0,offset:1");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"pt", "_id", "v1"});
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        List<String> expected = Collections.singletonList("+I[1, 2, two]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionLatest() throws Exception {
        String topic = "start_up_latest";
        this.createTestTopic("start_up_latest", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/startupmode/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_latest", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_latest");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET.toString());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try {
            this.writeRecordsToKafka("start_up_latest", KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"pt", "_id", "v1"});
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        List<String> expected = Arrays.asList("+I[1, 3, three]", "+I[1, 4, four]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionTimestamp() throws Exception {
        String topic = "start_up_timestamp";
        this.createTestTopic("start_up_timestamp", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/startupmode/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_timestamp", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_timestamp");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.TIMESTAMP.toString());
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try {
            this.writeRecordsToKafka("start_up_timestamp", KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"pt", "_id", "v1"});
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        List<String> expected = Arrays.asList("+I[1, 3, three]", "+I[1, 4, four]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionEarliest() throws Exception {
        String topic = "start_up_earliest";
        this.createTestTopic("start_up_earliest", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/startupmode/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_earliest", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_earliest");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET.toString());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try {
            this.writeRecordsToKafka("start_up_earliest", KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"pt", "_id", "v1"});
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[1, 3, three]", "+I[1, 4, four]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionGroup() throws Exception {
        String topic = "start_up_group";
        this.createTestTopic("start_up_group", 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/startupmode/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_group", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_group");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.GROUP_OFFSETS.toString());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try {
            this.writeRecordsToKafka("start_up_group", KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"pt", "_id", "v1"});
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[1, 3, three]", "+I[1, 4, four]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testComputedColumn() throws Exception {
        String topic = "computed_column";
        this.createTestTopic(topic, 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/computedcolumn/canal-data-1.txt");
        try {
            this.writeRecordsToKafka(topic, lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("_year").withPrimaryKeys("_id", "_year").withTableConfig(this.getBasicTableConfig()).withComputedColumnArgs("_year=year(_date)").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.INT().notNull()}, (String[])new String[]{"_id", "_date", "_year"});
        this.waitForResult(Collections.singletonList("+I[1, 19439, 2023]"), this.getFileStoreTable(this.tableName), rowType, Arrays.asList("_id", "_year"));
    }

    @Test
    @Timeout(value=60L)
    public void testTypeMappingToString() throws Exception {
        String topic = "map-to-string";
        this.createTestTopic("map-to-string", 1, 1);
        this.writeRecordsToKafka("map-to-string", KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/tostring/canal-data-1.txt"));
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "map-to-string");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).withTypeMappingModes(TypeMapping.TypeMappingMode.TO_STRING.configString()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"k1", "v0", "v1"});
        this.waitForResult(Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"), this.getFileStoreTable(this.tableName), rowType, Collections.singletonList("k1"));
    }

    @Test
    public void testCatalogAndTableConfig() {
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(this.getBasicKafkaConfig()).withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value")).withTableConfig(Collections.singletonMap("table-key", "table-value")).build();
        Assertions.assertThat((Map)action.catalogConfig()).containsEntry((Object)"catalog-key", (Object)"catalog-value");
        Assertions.assertThat((Map)action.tableConfig()).containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
    }

    @Test
    @Timeout(value=60L)
    public void testCDCOperations() throws Exception {
        String topic = "event-insert";
        this.createTestTopic("event-insert", 1, 1);
        this.writeRecordsToKafka("event-insert", KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/event/event-row.txt"));
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "event-insert");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        List<String> primaryKeys = Collections.singletonList("_id");
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.VARBINARY((int)10), DataTypes.FLOAT()}, (String[])new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
        List<String> expectedRow = Collections.singletonList("+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]");
        this.waitForResult(expectedRow, table, rowType, primaryKeys);
        this.writeRecordsToKafka("event-insert", KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/event/event-insert.txt"));
        List<String> expectedInsert = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, two, NULL, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]", "+I[2, 4, four, NULL, NULL, NULL, NULL]");
        this.waitForResult(expectedInsert, table, rowType, primaryKeys);
        this.writeRecordsToKafka("event-insert", KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/event/event-update.txt"));
        List<String> expectedUpdate = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]", "+I[2, 4, four, NULL, NULL, NULL, NULL]");
        this.waitForResult(expectedUpdate, table, rowType, primaryKeys);
        this.writeRecordsToKafka("event-insert", KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/event/event-delete.txt"));
        List<String> expectedDelete = Arrays.asList("+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]", "+I[2, 4, four, NULL, NULL, NULL, NULL]");
        this.waitForResult(expectedDelete, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=120L)
    public void testSyncWithInitialEmptyTopic() throws Exception {
        String topic = "initial_empty_topic";
        this.createTestTopic(topic, 1, 1);
        this.createFileStoreTable(RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.INT().notNull()}, (String[])new String[]{"_id", "_date", "_year"}), Collections.singletonList("_year"), Arrays.asList("_id", "_year"), Collections.emptyMap());
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).withComputedColumnArgs("_year=year(_date)").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/initialemptytopic/canal-data-1.txt");
        this.writeRecordsToKafka(topic, lines);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"_id", "_date", "_year", "v"});
        this.waitForResult(Collections.singletonList("+I[1, 19439, 2023, paimon]"), this.getFileStoreTable(this.tableName), rowType, Arrays.asList("_id", "_year"));
    }

    @Test
    @Timeout(value=60L)
    public void testSynchronizeIncompleteJson() throws Exception {
        String topic = "incomplete";
        this.createTestTopic(topic, 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/incomplete/canal-data-1.txt");
        this.writeRecordsToKafka(topic, lines);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("k").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING()}, (String[])new String[]{"k", "v"});
        this.waitForResult(Arrays.asList("+I[1, Hi]", "+I[2, Hello]"), this.getFileStoreTable(this.tableName), rowType, Collections.singletonList("k"));
    }

    @Test
    @Timeout(value=60L)
    public void testSynchronizeNonPkTable() throws Exception {
        String topic = "non_pk";
        this.createTestTopic(topic, 1, 1);
        List<String> lines = KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/nonpk/canal-data-1.txt");
        this.writeRecordsToKafka(topic, lines);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"v0", "v1"});
        this.waitForResult(Arrays.asList("+I[five, 50]", "+I[five, 50]"), this.getFileStoreTable(this.tableName), rowType, Collections.emptyList());
    }

    @Test
    @Timeout(value=60L)
    public void testMissingDecimalPrecision() throws Exception {
        String topic = "missing-decimal-precision";
        this.createTestTopic(topic, 1, 1);
        this.writeRecordsToKafka(topic, KafkaCanalSyncTableActionITCase.readLines("kafka/canal/table/incomplete/canal-data-2.txt"));
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DECIMAL((int)38, (int)18)}, (String[])new String[]{"k", "v"});
        this.waitForResult(Collections.singletonList("+I[1, 1.200000000000000000]"), this.getFileStoreTable(this.tableName), rowType, Collections.singletonList("k"));
    }
}

