/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils;
import org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableAction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;

public class KafkaSyncTableActionITCase
extends KafkaActionITCaseBase {
    protected void runSingleTableSchemaEvolution(String sourceDir, String format) throws Exception {
        String topic = "schema_evolution";
        this.createTestTopic("schema_evolution", 1, 1);
        this.writeRecordsToKafka("schema_evolution", "kafka/%s/table/%s/%s-data-1.txt", format, sourceDir, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_evolution");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl("schema_evolution", sourceDir, format);
    }

    private void testSchemaEvolutionImpl(String topic, String sourceDir, String format) throws Exception {
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-2.txt", format, sourceDir, format);
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[102, car battery, 12V car battery, 8.1, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-3.txt", format, sourceDir, format);
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age", "address"});
        expected = Arrays.asList("+I[102, car battery, 12V car battery, 8.1, NULL, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", "+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    protected void runSingleTableSchemaEvolutionWithSchemaIncludeRecord(String sourceDir, String format) throws Exception {
        String topic = "schema_evolution";
        this.createTestTopic("schema_evolution", 1, 1);
        this.writeRecordsToKafka("schema_evolution", "kafka/%s/table/schema/%s/%s-data-1.txt", format, sourceDir, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_evolution");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        JobClient jobClient = this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Collections.singletonList("+I[101, scooter, Small 2-wheel scooter, 3.14]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.writeRecordsToKafka("schema_evolution", "kafka/%s/table/schema/%s/%s-data-2.txt", format, sourceDir, format);
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.INT()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.writeRecordsToKafka("schema_evolution", "kafka/%s/table/schema/%s/%s-data-3.txt", format, sourceDir, format);
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.BIGINT()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.writeRecordsToKafka("schema_evolution", "kafka/%s/table/schema/%s/%s-data-4.txt", format, sourceDir, format);
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.BIGINT()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]", "+I[105, hammer, 14oz carpenter's hammer, 0.875, 24]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.writeRecordsToKafka("schema_evolution", "kafka/%s/table/schema/%s/%s-data-5.txt", format, sourceDir, format);
        while (true) {
            JobStatus status;
            if ((status = (JobStatus)jobClient.getJobStatus().get()) != JobStatus.RUNNING) break;
            Thread.sleep(1000L);
        }
        Assertions.assertThatThrownBy(() -> {
            JobExecutionResult cfr_ignored_0 = (JobExecutionResult)jobClient.getJobExecutionResult().get();
        }).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Cannot convert field age from type BIGINT to STRING of Paimon table")});
    }

    public void testNotSupportFormat(String format) throws Exception {
        String topic = "not_support";
        this.createTestTopic("not_support", 1, 1);
        this.writeRecordsToKafka("not_support", "kafka/%s/table/schemaevolution/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "togg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "not_support");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"This format: togg-json is not supported.")});
    }

    protected void testAssertSchemaCompatible(String format) throws Exception {
        String topic = "assert_schema_compatible";
        this.createTestTopic("assert_schema_compatible", 1, 1);
        this.writeRecordsToKafka("assert_schema_compatible", "kafka/%s/table/schemaevolution/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "assert_schema_compatible");
        this.createFileStoreTable(RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"k", "v1"}), Collections.emptyList(), Collections.singletonList("k"), Collections.emptyList(), Collections.emptyMap());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"Paimon schema and source table schema are not compatible.\nPaimon fields are: [`k` STRING NOT NULL, `v1` STRING].\nSource table fields are: [`id` STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]")});
    }

    protected void testStarUpOptionSpecific(String format) throws Exception {
        String topic = "start_up_specific";
        this.createTestTopic("start_up_specific", 1, 1);
        this.writeRecordsToKafka("start_up_specific", "kafka/%s/table/startupmode/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_specific");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS.toString());
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key(), "partition:0,offset:1");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Collections.singletonList("+I[102, car battery, 12V car battery, 8.1]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    protected void testStarUpOptionLatest(String format) throws Exception {
        String topic = "start_up_latest";
        this.createTestTopic("start_up_latest", 1, 1);
        this.writeRecordsToKafka("start_up_latest", true, "kafka/%s/table/startupmode/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_latest");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET.toString());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        Thread.sleep(5000L);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        this.writeRecordsToKafka("start_up_latest", "kafka/%s/table/startupmode/%s-data-2.txt", format, format);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    public void testStarUpOptionTimestamp(String format) throws Exception {
        String topic = "start_up_timestamp";
        this.createTestTopic("start_up_timestamp", 1, 1);
        this.writeRecordsToKafka("start_up_timestamp", true, "kafka/%s/table/startupmode/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_timestamp");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.TIMESTAMP.toString());
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.writeRecordsToKafka("start_up_timestamp", "kafka/%s/table/startupmode/%s-data-2.txt", format, format);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    public void testStarUpOptionEarliest(String format) throws Exception {
        String topic = "start_up_earliest";
        this.createTestTopic("start_up_earliest", 1, 1);
        this.writeRecordsToKafka("start_up_earliest", "kafka/%s/table/startupmode/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_earliest");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET.toString());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.writeRecordsToKafka("start_up_earliest", "kafka/%s/table/startupmode/%s-data-2.txt", format, format);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    public void testStarUpOptionGroup(String format) throws Exception {
        String topic = "start_up_group";
        this.createTestTopic("start_up_group", 1, 1);
        this.writeRecordsToKafka("start_up_group", "kafka/%s/table/startupmode/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_group");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.GROUP_OFFSETS.toString());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.writeRecordsToKafka("start_up_group", "kafka/%s/table/startupmode/%s-data-2.txt", format, format);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    public void testComputedColumn(String format) throws Exception {
        String topic = "computed_column";
        this.createTestTopic(topic, 1, 1);
        this.writeRecordsToKafka(topic, "kafka/%s/table/computedcolumn/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("_year").withPrimaryKeys("_id", "_year").withComputedColumnArgs("_year=year(_date)").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.INT().notNull()}, (String[])new String[]{"_id", "_date", "_year"});
        this.waitForResult(Collections.singletonList("+I[101, 2023-03-23, 2023]"), this.getFileStoreTable(this.tableName), rowType, Arrays.asList("_id", "_year"));
    }

    protected void testCDCOperations(String format) throws Exception {
        String topic = "event";
        this.createTestTopic(topic, 1, 1);
        this.writeRecordsToKafka(topic, "kafka/%s/table/event/event-insert.txt", format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        List<String> primaryKeys = Collections.singletonList("id");
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> expectedInsert = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]", "+I[103, scooter, Big 2-wheel scooter , 5.1]");
        this.waitForResult(expectedInsert, table, rowType, primaryKeys);
        this.writeRecordsToKafka(topic, "kafka/%s/table/event/event-update.txt", format);
        List<String> expectedUpdate = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]", "+I[103, scooter, Big 2-wheel scooter , 8.1]");
        this.waitForResult(expectedUpdate, table, rowType, primaryKeys);
        this.writeRecordsToKafka(topic, "kafka/%s/table/event/event-delete.txt", format);
        List<String> expectedReplace = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]");
        this.waitForResult(expectedReplace, table, rowType, primaryKeys);
    }

    public void testKafkaBuildSchemaWithDelete(String format) throws Exception {
        String topic = "test_kafka_schema";
        this.createTestTopic("test_kafka_schema", 1, 1);
        this.writeRecordsToKafka("test_kafka_schema", "kafka/%s/table/schema/schemaevolution/%s-data-with-delete.txt", format, format);
        Configuration kafkaConfig = Configuration.fromMap(this.getBasicKafkaConfig());
        kafkaConfig.setString(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.setString(KafkaConnectorOptions.TOPIC.key(), "test_kafka_schema");
        Schema kafkaSchema = MessageQueueSchemaUtils.getSchema((MessageQueueSchemaUtils.ConsumerWrapper)KafkaActionUtils.getKafkaEarliestConsumer((Configuration)kafkaConfig, (KafkaDeserializationSchema)new KafkaDebeziumJsonDeserializationSchema()), (DataFormat)KafkaActionUtils.getDataFormat((Configuration)kafkaConfig), (TypeMapping)TypeMapping.defaultMapping());
        ArrayList<DataField> fields = new ArrayList<DataField>();
        fields.add(new DataField(0, "id", (DataType)DataTypes.STRING()));
        fields.add(new DataField(1, "name", (DataType)DataTypes.STRING()));
        fields.add(new DataField(2, "description", (DataType)DataTypes.STRING()));
        fields.add(new DataField(3, "weight", (DataType)DataTypes.STRING()));
        Assertions.assertThat((List)kafkaSchema.fields()).isEqualTo(fields);
    }

    public void testWaterMarkSyncTable(String format) throws Exception {
        String topic = "watermark";
        this.createTestTopic(topic, 1, 1);
        this.writeRecordsToKafka(topic, "kafka/%s/table/watermark/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        Map<String, String> config = this.getBasicTableConfig();
        if ("debezium".equals(format)) {
            config.remove("bucket");
            config.put("write-only", "true");
        }
        config.put("tag.automatic-creation", "watermark");
        config.put("tag.creation-period", "hourly");
        config.put("scan.watermark.alignment.group", "alignment-group-1");
        config.put("scan.watermark.alignment.max-drift", "20 s");
        config.put("scan.watermark.alignment.update-interval", "1 s");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = (FileStoreTable)this.catalog.getTable(new Identifier(this.database, this.tableName));
        while (table.snapshotManager().snapshotCount() <= 0L || table.snapshotManager().latestSnapshot().watermark() == Long.MIN_VALUE) {
            Thread.sleep(1000L);
        }
        return;
    }

    public void testSchemaIncludeRecord(String format) throws Exception {
        String topic = "schema_include";
        this.createTestTopic(topic, 1, 1);
        this.writeRecordsToKafka(topic, "kafka/debezium/table/schema/include/debezium-data-1.txt", new Object[0]);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Collections.singletonList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    public void testAllTypesWithSchemaImpl(String format) throws Exception {
        String topic = "schema_include_all_type";
        this.createTestTopic(topic, 1, 1);
        this.writeRecordsToKafka(topic, "kafka/debezium/table/schema/alltype/debezium-data-1.txt", new Object[0]);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.waitingTables(this.tableName);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DECIMAL((int)2, (int)1).notNull(), DataTypes.BOOLEAN(), DataTypes.BINARY((int)8), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.DECIMAL((int)8, (int)3), DataTypes.DECIMAL((int)8, (int)3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)38, (int)10), DataTypes.DATE(), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP((int)6), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT(), DataTypes.TIME(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"_id", "pt", "_bit1", "_bit", "_tinyint1", "_boolean", "_bool", "_tinyint", "_tinyint_unsigned", "_tinyint_unsigned_zerofill", "_smallint", "_smallint_unsigned", "_smallint_unsigned_zerofill", "_mediumint", "_mediumint_unsigned", "_mediumint_unsigned_zerofill", "_int", "_int_unsigned", "_int_unsigned_zerofill", "_bigint", "_bigint_unsigned", "_bigint_unsigned_zerofill", "_serial", "_float", "_float_unsigned", "_float_unsigned_zerofill", "_real", "_real_unsigned", "_real_unsigned_zerofill", "_double", "_double_unsigned", "_double_unsigned_zerofill", "_double_precision", "_double_precision_unsigned", "_double_precision_unsigned_zerofill", "_numeric", "_numeric_unsigned", "_numeric_unsigned_zerofill", "_fixed", "_fixed_unsigned", "_fixed_unsigned_zerofill", "_decimal", "_decimal_unsigned", "_decimal_unsigned_zerofill", "_big_decimal", "_date", "_datetime", "_datetime3", "_datetime6", "_datetime_p", "_datetime_p2", "_timestamp", "_timestamp0", "_char", "_varchar", "_tinytext", "_text", "_mediumtext", "_longtext", "_bin", "_varbin", "_tinyblob", "_blob", "_mediumblob", "_longblob", "_json", "_enum", "_year", "_time", "_point", "_geometry", "_linestring", "_polygon", "_multipoint", "_multiline", "_multipolygon", "_geometrycollection", "_set"});
        String bits = Arrays.toString(new byte[]{0, 0, 0, 0, 0, 0, 7, -57});
        List<String> expected = Collections.singletonList("+I[1, 1.1, " + String.format("true, %s, ", bits) + "1, 1, 0, 1, 2, 3, 1000, 2000, 3000, 100000, 200000, 300000, 1000000, 2000000, 3000000, 10000000000, 20000000000, 30000000000, 40000000000, 1.5, 2.5, 3.5, 1.000001, 2.000002, 3.000003, 1.000011, 2.000022, 3.000033, 1.000111, 2.000222, 3.000333, 12345.110, 12345.220, 12345.330, 123456789876543212345678987654321.11, 123456789876543212345678987654321.22, 123456789876543212345678987654321.33, 11111, 22222, 33333, 2222222222222222300000001111.1234567890, 19439, 2023-03-23T14:30:05, 2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, 2023-03-24T14:30, 2023-03-24T14:30:05.120, 2023-03-23T22:00:10.123456, 2023-03-23T07:10, Paimon, Apache Paimon, Apache Paimon MySQL TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, [98, 121, 116, 101, 115, 0, 0, 0, 0, 0], [109, 111, 114, 101, 32, 98, 121, 116, 101, 115], [84, 73, 78, 89, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [77, 69, 68, 73, 85, 77, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [76, 79, 78, 71, 66, 76, 79, 66, 32, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], {\"a\": \"b\"}, value1, 2023, 36803000, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}, a,b]");
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    protected void testTableFiledValNull(String format) throws Exception {
        String topic = "table_filed_val_null";
        this.createTestTopic("table_filed_val_null", 1, 1);
        this.writeRecordsToKafka("table_filed_val_null", "kafka/%s/table/schemaevolution/%s-data-4.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "table_filed_val_null");
        kafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET.toString());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        Thread.sleep(5000L);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, null]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }
}

