/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.core.execution.JobClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableAction;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.Base64Variants;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class KafkaDebeziumAvroSyncTableActionITCase
extends KafkaActionITCaseBase {
    protected static final Schema NULL_AVRO_SCHEMA = Schema.create((Schema.Type)Schema.Type.NULL);
    protected KafkaAvroSerializer kafkaKeyAvroSerializer;
    protected KafkaAvroSerializer kafkaValueAvroSerializer;
    private static final String ALL_TYPES_TABLE_KEY_SCHEMA = "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"test_avro.workdb.all_types_table\",\"fields\":[{\"name\":\"_id\",\"type\":\"int\"}],\"connect.name\":\"test_avro.workdb.all_types_table.Key\"}";
    private static final String ALL_TYPES_TABLE_VALUE_SCHEMA = "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"test_avro.workdb.all_types_table\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"_id\",\"type\":{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"INT\",\"__debezium.source.column.name\":\"_id\"}}},{\"name\":\"pt\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DECIMAL\",\"__debezium.source.column.length\":\"2\",\"__debezium.source.column.scale\":\"1\",\"__debezium.source.column.name\":\"pt\"}}],\"default\":null},{\"name\":\"_tinyint1\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"TINYINT\",\"__debezium.source.column.length\":\"1\",\"__debezium.source.column.name\":\"_tinyint1\"},\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"_boolean\",\"type\":[\"null\",{\"type\":\"boolean\",\"connect.parameters\":{\"__debezium.source.column.type\":\"BOOLEAN\",\"__debezium.source.column.name\":\"_boolean\"}}],\"default\":null},{\"name\":\"_bool\",\"type\":[\"null\",{\"type\":\"boolean\",\"connect.parameters\":{\"__debezium.source.column.type\":\"BOOL\",\"__debezium.source.column.name\":\"_bool\"}}],\"default\":null},{\"name\":\"_tinyint\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"TINYINT\",\"__debezium.source.column.name\":\"_tinyint\"},\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"_tinyint_unsigned\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"TINYINT UNSIGNED\",\"__debezium.source.column.length\":\"2\",\"__debezium.source.column.name\":\"_tinyint_unsigned\"},\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"_tinyint_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"TINYINT UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"2\",\"__debezium.source.column.name\":\"_tinyint_unsigned_zerofill\"},\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"_smallint\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"SMALLINT\",\"__debezium.source.column.name\":\"_smallint\"},\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"_smallint_unsigned\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"SMALLINT UNSIGNED\",\"__debezium.source.column.name\":\"_smallint_unsigned\"}}],\"default\":null},{\"name\":\"_smallint_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"SMALLINT UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"4\",\"__debezium.source.column.name\":\"_smallint_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_mediumint\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"MEDIUMINT\",\"__debezium.source.column.name\":\"_mediumint\"}}],\"default\":null},{\"name\":\"_mediumint_unsigned\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"MEDIUMINT UNSIGNED\",\"__debezium.source.column.name\":\"_mediumint_unsigned\"}}],\"default\":null},{\"name\":\"_mediumint_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"MEDIUMINT UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"8\",\"__debezium.source.column.name\":\"_mediumint_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_int\",\"type\":[\"null\",{\"type\":\"int\",\"connect.parameters\":{\"__debezium.source.column.type\":\"INT\",\"__debezium.source.column.name\":\"_int\"}}],\"default\":null},{\"name\":\"_int_unsigned\",\"type\":[\"null\",{\"type\":\"long\",\"connect.parameters\":{\"__debezium.source.column.type\":\"INT UNSIGNED\",\"__debezium.source.column.name\":\"_int_unsigned\"}}],\"default\":null},{\"name\":\"_int_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"long\",\"connect.parameters\":{\"__debezium.source.column.type\":\"INT UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"8\",\"__debezium.source.column.name\":\"_int_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_bigint\",\"type\":[\"null\",{\"type\":\"long\",\"connect.parameters\":{\"__debezium.source.column.type\":\"BIGINT\",\"__debezium.source.column.name\":\"_bigint\"}}],\"default\":null},{\"name\":\"_bigint_unsigned\",\"type\":[\"null\",{\"type\":\"long\",\"connect.parameters\":{\"__debezium.source.column.type\":\"BIGINT UNSIGNED\",\"__debezium.source.column.name\":\"_bigint_unsigned\"}}],\"default\":null},{\"name\":\"_bigint_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"long\",\"connect.parameters\":{\"__debezium.source.column.type\":\"BIGINT UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"16\",\"__debezium.source.column.name\":\"_bigint_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_serial\",\"type\":{\"type\":\"long\",\"connect.parameters\":{\"__debezium.source.column.type\":\"BIGINT UNSIGNED\",\"__debezium.source.column.name\":\"_serial\"}}},{\"name\":\"_float\",\"type\":[\"null\",{\"type\":\"float\",\"connect.parameters\":{\"__debezium.source.column.type\":\"FLOAT\",\"__debezium.source.column.name\":\"_float\"}}],\"default\":null},{\"name\":\"_float_unsigned\",\"type\":[\"null\",{\"type\":\"float\",\"connect.parameters\":{\"__debezium.source.column.type\":\"FLOAT UNSIGNED\",\"__debezium.source.column.name\":\"_float_unsigned\"}}],\"default\":null},{\"name\":\"_float_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"float\",\"connect.parameters\":{\"__debezium.source.column.type\":\"FLOAT UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"4\",\"__debezium.source.column.name\":\"_float_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_real\",\"type\":[\"null\",{\"type\":\"float\",\"connect.parameters\":{\"__debezium.source.column.type\":\"REAL\",\"__debezium.source.column.name\":\"_real\"}}],\"default\":null},{\"name\":\"_real_unsigned\",\"type\":[\"null\",{\"type\":\"float\",\"connect.parameters\":{\"__debezium.source.column.type\":\"REAL UNSIGNED\",\"__debezium.source.column.name\":\"_real_unsigned\"}}],\"default\":null},{\"name\":\"_real_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"float\",\"connect.parameters\":{\"__debezium.source.column.type\":\"REAL UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"10\",\"__debezium.source.column.scale\":\"7\",\"__debezium.source.column.name\":\"_real_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_double\",\"type\":[\"null\",{\"type\":\"double\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DOUBLE\",\"__debezium.source.column.name\":\"_double\"}}],\"default\":null},{\"name\":\"_double_unsigned\",\"type\":[\"null\",{\"type\":\"double\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DOUBLE UNSIGNED\",\"__debezium.source.column.name\":\"_double_unsigned\"}}],\"default\":null},{\"name\":\"_double_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"double\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DOUBLE UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"10\",\"__debezium.source.column.scale\":\"7\",\"__debezium.source.column.name\":\"_double_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_double_precision\",\"type\":[\"null\",{\"type\":\"double\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DOUBLE PRECISION\",\"__debezium.source.column.name\":\"_double_precision\"}}],\"default\":null},{\"name\":\"_double_precision_unsigned\",\"type\":[\"null\",{\"type\":\"double\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DOUBLE PRECISION UNSIGNED\",\"__debezium.source.column.name\":\"_double_precision_unsigned\"}}],\"default\":null},{\"name\":\"_double_precision_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"double\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DOUBLE PRECISION UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"10\",\"__debezium.source.column.scale\":\"7\",\"__debezium.source.column.name\":\"_double_precision_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_numeric\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"NUMERIC\",\"__debezium.source.column.length\":\"8\",\"__debezium.source.column.scale\":\"3\",\"__debezium.source.column.name\":\"_numeric\"}}],\"default\":null},{\"name\":\"_numeric_unsigned\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"NUMERIC UNSIGNED\",\"__debezium.source.column.length\":\"8\",\"__debezium.source.column.scale\":\"3\",\"__debezium.source.column.name\":\"_numeric_unsigned\"}}],\"default\":null},{\"name\":\"_numeric_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"NUMERIC UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"8\",\"__debezium.source.column.scale\":\"3\",\"__debezium.source.column.name\":\"_numeric_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_fixed\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"FIXED\",\"__debezium.source.column.length\":\"40\",\"__debezium.source.column.scale\":\"3\",\"__debezium.source.column.name\":\"_fixed\"}}],\"default\":null},{\"name\":\"_fixed_unsigned\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"FIXED UNSIGNED\",\"__debezium.source.column.length\":\"40\",\"__debezium.source.column.scale\":\"3\",\"__debezium.source.column.name\":\"_fixed_unsigned\"}}],\"default\":null},{\"name\":\"_fixed_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"FIXED UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"40\",\"__debezium.source.column.scale\":\"3\",\"__debezium.source.column.name\":\"_fixed_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_decimal\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DECIMAL\",\"__debezium.source.column.length\":\"8\",\"__debezium.source.column.scale\":\"0\",\"__debezium.source.column.name\":\"_decimal\"}}],\"default\":null},{\"name\":\"_decimal_unsigned\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DECIMAL UNSIGNED\",\"__debezium.source.column.length\":\"8\",\"__debezium.source.column.scale\":\"0\",\"__debezium.source.column.name\":\"_decimal_unsigned\"}}],\"default\":null},{\"name\":\"_decimal_unsigned_zerofill\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"DECIMAL UNSIGNED ZEROFILL\",\"__debezium.source.column.length\":\"8\",\"__debezium.source.column.scale\":\"0\",\"__debezium.source.column.name\":\"_decimal_unsigned_zerofill\"}}],\"default\":null},{\"name\":\"_date\",\"type\":[\"null\",{\"type\":\"int\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"DATE\",\"__debezium.source.column.name\":\"_date\"},\"connect.name\":\"io.debezium.time.Date\"}],\"default\":null},{\"name\":\"_datetime\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"DATETIME\",\"__debezium.source.column.name\":\"_datetime\"},\"connect.name\":\"io.debezium.time.Timestamp\"}],\"default\":null},{\"name\":\"_datetime3\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"DATETIME\",\"__debezium.source.column.length\":\"3\",\"__debezium.source.column.name\":\"_datetime3\"},\"connect.name\":\"io.debezium.time.Timestamp\"}],\"default\":null},{\"name\":\"_datetime6\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"DATETIME\",\"__debezium.source.column.length\":\"6\",\"__debezium.source.column.name\":\"_datetime6\"},\"connect.name\":\"io.debezium.time.MicroTimestamp\"}],\"default\":null},{\"name\":\"_datetime_p\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"DATETIME\",\"__debezium.source.column.name\":\"_datetime_p\"},\"connect.name\":\"io.debezium.time.Timestamp\"}],\"default\":null},{\"name\":\"_datetime_p2\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"DATETIME\",\"__debezium.source.column.length\":\"2\",\"__debezium.source.column.name\":\"_datetime_p2\"},\"connect.name\":\"io.debezium.time.Timestamp\"}],\"default\":null},{\"name\":\"_timestamp\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"TIMESTAMP\",\"__debezium.source.column.length\":\"6\",\"__debezium.source.column.name\":\"_timestamp\"},\"connect.name\":\"io.debezium.time.ZonedTimestamp\"}],\"default\":null},{\"name\":\"_timestamp0\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"TIMESTAMP\",\"__debezium.source.column.name\":\"_timestamp0\"},\"connect.name\":\"io.debezium.time.ZonedTimestamp\"}],\"default\":null},{\"name\":\"_char\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"CHAR\",\"__debezium.source.column.length\":\"10\",\"__debezium.source.column.name\":\"_char\"}}],\"default\":null},{\"name\":\"_varchar\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"VARCHAR\",\"__debezium.source.column.length\":\"20\",\"__debezium.source.column.name\":\"_varchar\"}}],\"default\":null},{\"name\":\"_tinytext\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"TINYTEXT\",\"__debezium.source.column.name\":\"_tinytext\"}}],\"default\":null},{\"name\":\"_text\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"TEXT\",\"__debezium.source.column.name\":\"_text\"}}],\"default\":null},{\"name\":\"_mediumtext\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"MEDIUMTEXT\",\"__debezium.source.column.name\":\"_mediumtext\"}}],\"default\":null},{\"name\":\"_longtext\",\"type\":[\"null\",{\"type\":\"string\",\"connect.parameters\":{\"__debezium.source.column.type\":\"LONGTEXT\",\"__debezium.source.column.name\":\"_longtext\"}}],\"default\":null},{\"name\":\"_bin\",\"type\":[\"null\",{\"type\":\"bytes\",\"connect.parameters\":{\"__debezium.source.column.type\":\"BINARY\",\"__debezium.source.column.length\":\"10\",\"__debezium.source.column.name\":\"_bin\"}}],\"default\":null},{\"name\":\"_varbin\",\"type\":[\"null\",{\"type\":\"bytes\",\"connect.parameters\":{\"__debezium.source.column.type\":\"VARBINARY\",\"__debezium.source.column.length\":\"20\",\"__debezium.source.column.name\":\"_varbin\"}}],\"default\":null},{\"name\":\"_tinyblob\",\"type\":[\"null\",{\"type\":\"bytes\",\"connect.parameters\":{\"__debezium.source.column.type\":\"TINYBLOB\",\"__debezium.source.column.name\":\"_tinyblob\"}}],\"default\":null},{\"name\":\"_blob\",\"type\":[\"null\",{\"type\":\"bytes\",\"connect.parameters\":{\"__debezium.source.column.type\":\"BLOB\",\"__debezium.source.column.name\":\"_blob\"}}],\"default\":null},{\"name\":\"_mediumblob\",\"type\":[\"null\",{\"type\":\"bytes\",\"connect.parameters\":{\"__debezium.source.column.type\":\"MEDIUMBLOB\",\"__debezium.source.column.name\":\"_mediumblob\"}}],\"default\":null},{\"name\":\"_longblob\",\"type\":[\"null\",{\"type\":\"bytes\",\"connect.parameters\":{\"__debezium.source.column.type\":\"LONGBLOB\",\"__debezium.source.column.name\":\"_longblob\"}}],\"default\":null},{\"name\":\"_json\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"JSON\",\"__debezium.source.column.name\":\"_json\"},\"connect.name\":\"io.debezium.data.Json\"}],\"default\":null},{\"name\":\"_enum\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"value1,value2,value3\",\"__debezium.source.column.type\":\"ENUM\",\"__debezium.source.column.length\":\"1\",\"__debezium.source.column.name\":\"_enum\"},\"connect.name\":\"io.debezium.data.Enum\"}],\"default\":null},{\"name\":\"_year\",\"type\":[\"null\",{\"type\":\"int\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"YEAR\",\"__debezium.source.column.name\":\"_year\"},\"connect.name\":\"io.debezium.time.Year\"}],\"default\":null},{\"name\":\"_time\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"TIME\",\"__debezium.source.column.name\":\"_time\"},\"connect.name\":\"io.debezium.time.MicroTime\"}],\"default\":null},{\"name\":\"_point\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Point\",\"namespace\":\"io.debezium.data.geometry\",\"fields\":[{\"name\":\"x\",\"type\":\"double\"},{\"name\":\"y\",\"type\":\"double\"},{\"name\":\"wkb\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"srid\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.doc\":\"Geometry (POINT)\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"POINT\",\"__debezium.source.column.name\":\"_point\"},\"connect.name\":\"io.debezium.data.geometry.Point\"}],\"default\":null},{\"name\":\"_geometry\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Geometry\",\"namespace\":\"io.debezium.data.geometry\",\"fields\":[{\"name\":\"wkb\",\"type\":\"bytes\"},{\"name\":\"srid\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.doc\":\"Geometry\",\"connect.version\":1,\"connect.parameters\":{\"__debezium.source.column.type\":\"GEOMETRY\",\"__debezium.source.column.name\":\"_geometry\"},\"connect.name\":\"io.debezium.data.geometry.Geometry\"}],\"default\":null},{\"name\":\"_linestring\",\"type\":[\"null\",\"io.debezium.data.geometry.Geometry\"],\"default\":null},{\"name\":\"_polygon\",\"type\":[\"null\",\"io.debezium.data.geometry.Geometry\"],\"default\":null},{\"name\":\"_multipoint\",\"type\":[\"null\",\"io.debezium.data.geometry.Geometry\"],\"default\":null},{\"name\":\"_multiline\",\"type\":[\"null\",\"io.debezium.data.geometry.Geometry\"],\"default\":null},{\"name\":\"_multipolygon\",\"type\":[\"null\",\"io.debezium.data.geometry.Geometry\"],\"default\":null},{\"name\":\"_geometrycollection\",\"type\":[\"null\",\"io.debezium.data.geometry.Geometry\"],\"default\":null},{\"name\":\"_set\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"a,b,c,d\",\"__debezium.source.column.type\":\"SET\",\"__debezium.source.column.length\":\"7\",\"__debezium.source.column.name\":\"_set\"},\"connect.name\":\"io.debezium.data.EnumSet\"}],\"default\":null}],\"connect.name\":\"test_avro.workdb.all_types_table.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"table\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"server_id\",\"type\":\"long\"},{\"name\":\"gtid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"long\"},{\"name\":\"row\",\"type\":\"int\"},{\"name\":\"thread\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"query\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.mysql.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"block\",\"namespace\":\"event\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}],\"connect.version\":1,\"connect.name\":\"event.block\"}],\"default\":null}],\"connect.version\":1,\"connect.name\":\"test_avro.workdb.all_types_table.Envelope\"}";
    private Schema allTypesTableKeySchema;
    private Schema allTypesTableValueSchema;
    private Schema debeziumSourceSchema;

    @BeforeEach
    public void setup() {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("schema.registry.url", this.getSchemaRegistryUrl());
        props.put("auto.register.schemas", true);
        this.kafkaKeyAvroSerializer = new KafkaAvroSerializer();
        this.kafkaKeyAvroSerializer.configure(props, true);
        this.kafkaValueAvroSerializer = new KafkaAvroSerializer();
        this.kafkaValueAvroSerializer.configure(props, false);
        Schema.Parser parser = new Schema.Parser();
        this.allTypesTableKeySchema = parser.parse(ALL_TYPES_TABLE_KEY_SCHEMA);
        this.allTypesTableValueSchema = parser.parse(ALL_TYPES_TABLE_VALUE_SCHEMA);
        this.debeziumSourceSchema = this.allTypesTableValueSchema.getField("source").schema();
    }

    @Test
    @Timeout(value=60L)
    public void testAllTypes() throws Exception {
        for (int i = 0; i < 2; ++i) {
            this.testAllTypesOnce();
            Thread.sleep(3000L);
        }
    }

    private void testAllTypesOnce() throws Exception {
        String topic = "all_type" + UUID.randomUUID();
        this.createTestTopic(topic, 1, 1);
        this.writeRecordsToKafka(topic, "kafka/debezium-avro/table/schema/alltype/debezium-avro-data-1.txt", new Object[0]);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put("value.format", "debezium-avro");
        kafkaConfig.put("topic", topic);
        kafkaConfig.put("schema.registry.url", this.getSchemaRegistryUrl());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").build();
        JobClient client = this.runActionWithDefaultEnv((ActionBase)action);
        this.testAllTypesImpl();
        client.cancel().get();
    }

    @Override
    protected void writeRecordsToKafka(String topic, String resourceDirFormat, Object ... args) throws Exception {
        Properties producerProperties = KafkaDebeziumAvroSyncTableActionITCase.getStandardProps();
        producerProperties.setProperty("retries", "0");
        producerProperties.put("key.serializer", ByteArraySerializer.class.getName());
        producerProperties.put("value.serializer", ByteArraySerializer.class.getName());
        URL url = KafkaActionITCaseBase.class.getClassLoader().getResource(String.format(resourceDirFormat, args));
        List lines = Files.readAllLines(Paths.get(url.toURI())).stream().filter(this::isRecordLine).collect(Collectors.toList());
        try (KafkaProducer kafkaProducer = new KafkaProducer(producerProperties);){
            for (int i = 0; i < lines.size(); i += 2) {
                JsonNode geometrycollectionJsonNode;
                JsonNode multipolygonJsonNode;
                JsonNode multilineJsonNode;
                JsonNode multipointJsonNode;
                JsonNode polygonJsonNode;
                JsonNode linestringJsonNode;
                JsonNode pointJsonValue;
                JsonNode key = this.objectMapper.readTree((String)lines.get(i));
                JsonNode value = this.objectMapper.readTree((String)lines.get(i + 1));
                JsonNode keyPayload = key.get("payload");
                JsonNode valuePayload = value.get("payload");
                JsonNode source = valuePayload.get("source");
                JsonNode after = valuePayload.get("after");
                GenericData.Record avroKey = new GenericData.Record(this.allTypesTableKeySchema);
                avroKey.put("_id", (Object)keyPayload.get("_id").asInt());
                GenericData.Record avroValue = new GenericData.Record(this.allTypesTableValueSchema);
                Schema beforeSchema = this.allTypesTableValueSchema.getField("before").schema();
                GenericData.Record afterAvroValue = new GenericData.Record(this.sanitizedSchema(beforeSchema));
                afterAvroValue.put("_id", (Object)after.get("_id").asInt());
                afterAvroValue.put("pt", (Object)after.get("pt").asText());
                if (this.nonNullNode(after.get("_tinyint1"))) {
                    afterAvroValue.put("_tinyint1", (Object)after.get("_tinyint1").asInt());
                }
                if (this.nonNullNode(after.get("_boolean"))) {
                    afterAvroValue.put("_boolean", (Object)after.get("_boolean").asBoolean());
                }
                if (this.nonNullNode(after.get("_bool"))) {
                    afterAvroValue.put("_bool", (Object)after.get("_bool").asBoolean());
                }
                if (this.nonNullNode(after.get("_tinyint"))) {
                    afterAvroValue.put("_tinyint", (Object)after.get("_tinyint").asInt());
                }
                if (this.nonNullNode(after.get("_tinyint_unsigned"))) {
                    afterAvroValue.put("_tinyint_unsigned", (Object)after.get("_tinyint_unsigned").asInt());
                }
                if (this.nonNullNode(after.get("_tinyint_unsigned_zerofill"))) {
                    afterAvroValue.put("_tinyint_unsigned_zerofill", (Object)after.get("_tinyint_unsigned_zerofill").asInt());
                }
                if (this.nonNullNode(after.get("_smallint"))) {
                    afterAvroValue.put("_smallint", (Object)after.get("_smallint").asInt());
                }
                if (this.nonNullNode(after.get("_smallint_unsigned"))) {
                    afterAvroValue.put("_smallint_unsigned", (Object)after.get("_smallint_unsigned").asInt());
                }
                if (this.nonNullNode(after.get("_smallint_unsigned_zerofill"))) {
                    afterAvroValue.put("_smallint_unsigned_zerofill", (Object)after.get("_smallint_unsigned_zerofill").asInt());
                }
                if (this.nonNullNode(after.get("_mediumint"))) {
                    afterAvroValue.put("_mediumint", (Object)after.get("_mediumint").asInt());
                }
                if (this.nonNullNode(after.get("_mediumint_unsigned"))) {
                    afterAvroValue.put("_mediumint_unsigned", (Object)after.get("_mediumint_unsigned").asInt());
                }
                if (this.nonNullNode(after.get("_mediumint_unsigned_zerofill"))) {
                    afterAvroValue.put("_mediumint_unsigned_zerofill", (Object)after.get("_mediumint_unsigned_zerofill").asInt());
                }
                if (this.nonNullNode(after.get("_int"))) {
                    afterAvroValue.put("_int", (Object)after.get("_int").asInt());
                }
                if (this.nonNullNode(after.get("_int_unsigned"))) {
                    afterAvroValue.put("_int_unsigned", (Object)after.get("_int_unsigned").asLong());
                }
                if (this.nonNullNode(after.get("_int_unsigned_zerofill"))) {
                    afterAvroValue.put("_int_unsigned_zerofill", (Object)after.get("_int_unsigned_zerofill").asLong());
                }
                if (this.nonNullNode(after.get("_bigint"))) {
                    afterAvroValue.put("_bigint", (Object)after.get("_bigint").asLong());
                }
                if (this.nonNullNode(after.get("_bigint_unsigned"))) {
                    afterAvroValue.put("_bigint_unsigned", (Object)after.get("_bigint_unsigned").asLong());
                }
                if (this.nonNullNode(after.get("_bigint_unsigned_zerofill"))) {
                    afterAvroValue.put("_bigint_unsigned_zerofill", (Object)after.get("_bigint_unsigned_zerofill").asLong());
                }
                afterAvroValue.put("_serial", (Object)after.get("_serial").asLong());
                if (this.nonNullNode(after.get("_float"))) {
                    afterAvroValue.put("_float", (Object)Float.valueOf(after.get("_float").floatValue()));
                }
                if (this.nonNullNode(after.get("_float_unsigned"))) {
                    afterAvroValue.put("_float_unsigned", (Object)Float.valueOf(after.get("_float_unsigned").floatValue()));
                }
                if (this.nonNullNode(after.get("_float_unsigned_zerofill"))) {
                    afterAvroValue.put("_float_unsigned_zerofill", (Object)Float.valueOf(after.get("_float_unsigned_zerofill").floatValue()));
                }
                if (this.nonNullNode(after.get("_real"))) {
                    afterAvroValue.put("_real", (Object)Float.valueOf(after.get("_real").floatValue()));
                }
                if (this.nonNullNode(after.get("_real_unsigned"))) {
                    afterAvroValue.put("_real_unsigned", (Object)Float.valueOf(after.get("_real_unsigned").floatValue()));
                }
                if (this.nonNullNode(after.get("_real_unsigned_zerofill"))) {
                    afterAvroValue.put("_real_unsigned_zerofill", (Object)Float.valueOf(after.get("_real_unsigned_zerofill").floatValue()));
                }
                if (this.nonNullNode(after.get("_double"))) {
                    afterAvroValue.put("_double", (Object)after.get("_double").asDouble());
                }
                if (this.nonNullNode(after.get("_double_unsigned"))) {
                    afterAvroValue.put("_double_unsigned", (Object)after.get("_double_unsigned").asDouble());
                }
                if (this.nonNullNode(after.get("_double_unsigned_zerofill"))) {
                    afterAvroValue.put("_double_unsigned_zerofill", (Object)after.get("_double_unsigned_zerofill").asDouble());
                }
                if (this.nonNullNode(after.get("_double_precision"))) {
                    afterAvroValue.put("_double_precision", (Object)after.get("_double_precision").asDouble());
                }
                if (this.nonNullNode(after.get("_double_precision_unsigned"))) {
                    afterAvroValue.put("_double_precision_unsigned", (Object)after.get("_double_precision_unsigned").asDouble());
                }
                if (this.nonNullNode(after.get("_double_precision_unsigned_zerofill"))) {
                    afterAvroValue.put("_double_precision_unsigned_zerofill", (Object)after.get("_double_precision_unsigned_zerofill").asDouble());
                }
                if (this.nonNullNode(after.get("_numeric"))) {
                    afterAvroValue.put("_numeric", (Object)after.get("_numeric").asText());
                }
                if (this.nonNullNode(after.get("_numeric_unsigned"))) {
                    afterAvroValue.put("_numeric_unsigned", (Object)after.get("_numeric_unsigned").asText());
                }
                if (this.nonNullNode(after.get("_numeric_unsigned_zerofill"))) {
                    afterAvroValue.put("_numeric_unsigned_zerofill", (Object)after.get("_numeric_unsigned_zerofill").asText());
                }
                if (this.nonNullNode(after.get("_fixed"))) {
                    afterAvroValue.put("_fixed", (Object)after.get("_fixed").asText());
                }
                if (this.nonNullNode(after.get("_fixed_unsigned"))) {
                    afterAvroValue.put("_fixed_unsigned", (Object)after.get("_fixed_unsigned").asText());
                }
                if (this.nonNullNode(after.get("_fixed_unsigned_zerofill"))) {
                    afterAvroValue.put("_fixed_unsigned_zerofill", (Object)after.get("_fixed_unsigned_zerofill").asText());
                }
                if (this.nonNullNode(after.get("_decimal"))) {
                    afterAvroValue.put("_decimal", (Object)after.get("_decimal").asText());
                }
                if (this.nonNullNode(after.get("_decimal_unsigned"))) {
                    afterAvroValue.put("_decimal_unsigned", (Object)after.get("_decimal_unsigned").asText());
                }
                if (this.nonNullNode(after.get("_decimal_unsigned_zerofill"))) {
                    afterAvroValue.put("_decimal_unsigned_zerofill", (Object)after.get("_decimal_unsigned_zerofill").asText());
                }
                if (this.nonNullNode(after.get("_date"))) {
                    afterAvroValue.put("_date", (Object)after.get("_date").asInt());
                }
                if (this.nonNullNode(after.get("_datetime"))) {
                    afterAvroValue.put("_datetime", (Object)after.get("_datetime").asLong());
                }
                if (this.nonNullNode(after.get("_datetime3"))) {
                    afterAvroValue.put("_datetime3", (Object)after.get("_datetime3").asLong());
                }
                if (this.nonNullNode(after.get("_datetime6"))) {
                    afterAvroValue.put("_datetime6", (Object)after.get("_datetime6").asLong());
                }
                if (this.nonNullNode(after.get("_datetime_p"))) {
                    afterAvroValue.put("_datetime_p", (Object)after.get("_datetime_p").asLong());
                }
                if (this.nonNullNode(after.get("_datetime_p2"))) {
                    afterAvroValue.put("_datetime_p2", (Object)after.get("_datetime_p2").asLong());
                }
                if (this.nonNullNode(after.get("_timestamp"))) {
                    afterAvroValue.put("_timestamp", (Object)after.get("_timestamp").asText());
                }
                if (this.nonNullNode(after.get("_timestamp0"))) {
                    afterAvroValue.put("_timestamp0", (Object)after.get("_timestamp0").asText());
                }
                if (this.nonNullNode(after.get("_char"))) {
                    afterAvroValue.put("_char", (Object)after.get("_char").asText());
                }
                if (this.nonNullNode(after.get("_varchar"))) {
                    afterAvroValue.put("_varchar", (Object)after.get("_varchar").asText());
                }
                if (this.nonNullNode(after.get("_tinytext"))) {
                    afterAvroValue.put("_tinytext", (Object)after.get("_tinytext").asText());
                }
                if (this.nonNullNode(after.get("_text"))) {
                    afterAvroValue.put("_text", (Object)after.get("_text").asText());
                }
                if (this.nonNullNode(after.get("_mediumtext"))) {
                    afterAvroValue.put("_mediumtext", (Object)after.get("_mediumtext").asText());
                }
                if (this.nonNullNode(after.get("_longtext"))) {
                    afterAvroValue.put("_longtext", (Object)after.get("_longtext").asText());
                }
                if (this.nonNullNode(after.get("_bin"))) {
                    afterAvroValue.put("_bin", (Object)ByteBuffer.wrap(Base64Variants.getDefaultVariant().decode(after.get("_bin").asText())));
                }
                if (this.nonNullNode(after.get("_varbin"))) {
                    afterAvroValue.put("_varbin", (Object)ByteBuffer.wrap(Base64Variants.getDefaultVariant().decode(after.get("_varbin").asText())));
                }
                if (this.nonNullNode(after.get("_tinyblob"))) {
                    afterAvroValue.put("_tinyblob", (Object)ByteBuffer.wrap(Base64Variants.getDefaultVariant().decode(after.get("_tinyblob").asText())));
                }
                if (this.nonNullNode(after.get("_blob"))) {
                    afterAvroValue.put("_blob", (Object)ByteBuffer.wrap(Base64Variants.getDefaultVariant().decode(after.get("_blob").asText())));
                }
                if (this.nonNullNode(after.get("_mediumblob"))) {
                    afterAvroValue.put("_mediumblob", (Object)ByteBuffer.wrap(Base64Variants.getDefaultVariant().decode(after.get("_mediumblob").asText())));
                }
                if (this.nonNullNode(after.get("_longblob"))) {
                    afterAvroValue.put("_longblob", (Object)ByteBuffer.wrap(Base64Variants.getDefaultVariant().decode(after.get("_longblob").asText())));
                }
                if (this.nonNullNode(after.get("_json"))) {
                    afterAvroValue.put("_json", (Object)after.get("_json").asText());
                }
                if (this.nonNullNode(after.get("_enum"))) {
                    afterAvroValue.put("_enum", (Object)after.get("_enum").asText());
                }
                if (this.nonNullNode(after.get("_year"))) {
                    afterAvroValue.put("_year", (Object)after.get("_year").asInt());
                }
                if (this.nonNullNode(after.get("_time"))) {
                    afterAvroValue.put("_time", (Object)after.get("_time").asLong());
                }
                if (this.nonNullNode(pointJsonValue = after.get("_point"))) {
                    Schema pointSchema = this.sanitizedSchema(this.sanitizedSchema(beforeSchema).getField("_point").schema());
                    afterAvroValue.put("_point", (Object)this.buildPointRecord(pointJsonValue, pointSchema));
                }
                JsonNode geometryJsonValue = after.get("_geometry");
                Schema geometrySchema = this.sanitizedSchema(this.sanitizedSchema(beforeSchema).getField("_geometry").schema());
                if (this.nonNullNode(geometryJsonValue)) {
                    afterAvroValue.put("_geometry", (Object)this.buildGeometryRecord(geometryJsonValue, geometrySchema));
                }
                if (this.nonNullNode(linestringJsonNode = after.get("_linestring"))) {
                    afterAvroValue.put("_linestring", (Object)this.buildGeometryRecord(linestringJsonNode, geometrySchema));
                }
                if (this.nonNullNode(polygonJsonNode = after.get("_polygon"))) {
                    afterAvroValue.put("_polygon", (Object)this.buildGeometryRecord(polygonJsonNode, geometrySchema));
                }
                if (this.nonNullNode(multipointJsonNode = after.get("_multipoint"))) {
                    afterAvroValue.put("_multipoint", (Object)this.buildGeometryRecord(multipointJsonNode, geometrySchema));
                }
                if (this.nonNullNode(multilineJsonNode = after.get("_multiline"))) {
                    afterAvroValue.put("_multiline", (Object)this.buildGeometryRecord(multilineJsonNode, geometrySchema));
                }
                if (this.nonNullNode(multipolygonJsonNode = after.get("_multipolygon"))) {
                    afterAvroValue.put("_multipolygon", (Object)this.buildGeometryRecord(multipolygonJsonNode, geometrySchema));
                }
                if (this.nonNullNode(geometrycollectionJsonNode = after.get("_geometrycollection"))) {
                    afterAvroValue.put("_geometrycollection", (Object)this.buildGeometryRecord(geometrycollectionJsonNode, geometrySchema));
                }
                if (this.nonNullNode(after.get("_set"))) {
                    afterAvroValue.put("_set", (Object)after.get("_set").asText());
                }
                avroValue.put("after", (Object)afterAvroValue);
                avroValue.put("source", (Object)this.buildDebeziumSourceProperty(this.debeziumSourceSchema, source));
                avroValue.put("op", (Object)valuePayload.get("op").asText());
                avroValue.put("ts_ms", (Object)valuePayload.get("ts_ms").asLong());
                kafkaProducer.send(new ProducerRecord(topic, (Object)this.kafkaKeyAvroSerializer.serialize(topic, (Object)avroKey), (Object)this.kafkaValueAvroSerializer.serialize(topic, (Object)avroValue)));
            }
        }
    }

    private boolean nonNullNode(JsonNode jsonNode) {
        return jsonNode != null && jsonNode.getNodeType() != JsonNodeType.NULL;
    }

    private GenericRecord buildPointRecord(JsonNode pointJsonValue, Schema debeziumPointSchema) {
        GenericData.Record pointAvroValue = new GenericData.Record(debeziumPointSchema);
        pointAvroValue.put("x", (Object)pointJsonValue.get("x").asDouble());
        pointAvroValue.put("y", (Object)pointJsonValue.get("y").asDouble());
        pointAvroValue.put("wkb", (Object)ByteBuffer.wrap(Base64Variants.getDefaultVariant().decode(pointJsonValue.get("wkb").asText())));
        pointAvroValue.put("srid", pointJsonValue.get("srid") != null ? Integer.valueOf(pointJsonValue.get("srid").asInt()) : null);
        return pointAvroValue;
    }

    private GenericRecord buildGeometryRecord(JsonNode geometryJsonValue, Schema debeziumGeometrySchema) {
        GenericData.Record geometryAvroValue = new GenericData.Record(debeziumGeometrySchema);
        geometryAvroValue.put("wkb", (Object)ByteBuffer.wrap(Base64Variants.getDefaultVariant().decode(geometryJsonValue.get("wkb").asText())));
        geometryAvroValue.put("srid", geometryJsonValue.get("srid") != null ? Integer.valueOf(geometryJsonValue.get("srid").asInt()) : null);
        return geometryAvroValue;
    }

    protected void testAllTypesImpl() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DECIMAL((int)2, (int)1).notNull(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DECIMAL((int)20, (int)0), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.DECIMAL((int)8, (int)3), DataTypes.DECIMAL((int)8, (int)3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DATE(), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)2), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP((int)0), DataTypes.CHAR((int)10), DataTypes.VARCHAR((int)20), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.VARBINARY((int)10), DataTypes.VARBINARY((int)20), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT(), DataTypes.TIME(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.ARRAY((DataType)DataTypes.STRING())}, (String[])new String[]{"_id", "pt", "_tinyint1", "_boolean", "_bool", "_tinyint", "_tinyint_unsigned", "_tinyint_unsigned_zerofill", "_smallint", "_smallint_unsigned", "_smallint_unsigned_zerofill", "_mediumint", "_mediumint_unsigned", "_mediumint_unsigned_zerofill", "_int", "_int_unsigned", "_int_unsigned_zerofill", "_bigint", "_bigint_unsigned", "_bigint_unsigned_zerofill", "_serial", "_float", "_float_unsigned", "_float_unsigned_zerofill", "_real", "_real_unsigned", "_real_unsigned_zerofill", "_double", "_double_unsigned", "_double_unsigned_zerofill", "_double_precision", "_double_precision_unsigned", "_double_precision_unsigned_zerofill", "_numeric", "_numeric_unsigned", "_numeric_unsigned_zerofill", "_fixed", "_fixed_unsigned", "_fixed_unsigned_zerofill", "_decimal", "_decimal_unsigned", "_decimal_unsigned_zerofill", "_date", "_datetime", "_datetime3", "_datetime6", "_datetime_p", "_datetime_p2", "_timestamp", "_timestamp0", "_char", "_varchar", "_tinytext", "_text", "_mediumtext", "_longtext", "_bin", "_varbin", "_tinyblob", "_blob", "_mediumblob", "_longblob", "_json", "_enum", "_year", "_time", "_point", "_geometry", "_linestring", "_polygon", "_multipoint", "_multiline", "_multipolygon", "_geometrycollection", "_set"});
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        List<String> expected = Arrays.asList("+I[1, 1.1, true, true, false, 1, 2, 3, 1000, 2000, 3000, 100000, 200000, 300000, 1000000, 2000000, 3000000, 10000000000, 20000000000, 30000000000, 40000000000, 1.5, 2.5, 3.5, 1.000001, 2.000002, 3.000003, 1.000011, 2.000022, 3.000033, 1.000111, 2.000222, 3.000333, 12345.110, 12345.220, 12345.330, 123456789876543212345678987654321.110, 123456789876543212345678987654321.220, 123456789876543212345678987654321.330, 11111, 22222, 33333, 19439, 2023-03-23T14:30:05, 2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, 2023-03-24T14:30, 2023-03-24T14:30:05.120, 2023-03-23T07:00:10.123456, 2023-03-22T16:10, Paimon, Apache Paimon, Apache Paimon MySQL TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, [98, 121, 116, 101, 115, 0, 0, 0, 0, 0], [109, 111, 114, 101, 32, 98, 121, 116, 101, 115], [84, 73, 78, 89, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [77, 69, 68, 73, 85, 77, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [76, 79, 78, 71, 66, 76, 79, 66, 32, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], {\"a\": \"b\"}, value1, 2023, 36803000, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}, [a, b]]", "+I[2, 2.2, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 50000000000, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL]");
        this.waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
    }

    private Schema sanitizedSchema(Schema schema) {
        if (schema.getType() == Schema.Type.UNION && schema.getTypes().size() == 2 && schema.getTypes().contains(NULL_AVRO_SCHEMA)) {
            for (Schema memberSchema : schema.getTypes()) {
                if (memberSchema.equals((Object)NULL_AVRO_SCHEMA)) continue;
                return memberSchema;
            }
        }
        return schema;
    }

    private GenericRecord buildDebeziumSourceProperty(Schema sourceSchema, JsonNode sourceValue) {
        GenericData.Record source = new GenericData.Record(sourceSchema);
        source.put("version", (Object)sourceValue.get("version").asText());
        source.put("connector", (Object)sourceValue.get("connector").asText());
        source.put("name", (Object)sourceValue.get("name").asText());
        source.put("ts_ms", (Object)sourceValue.get("ts_ms").asLong());
        source.put("snapshot", (Object)sourceValue.get("snapshot").asText());
        source.put("db", (Object)sourceValue.get("db").asText());
        source.put("sequence", sourceValue.get("sequence") == null ? null : sourceValue.get("sequence").asText());
        source.put("table", (Object)sourceValue.get("table").asText());
        source.put("server_id", (Object)sourceValue.get("server_id").asLong());
        source.put("gtid", sourceValue.get("gtid") == null ? null : sourceValue.get("gtid").asText());
        source.put("file", (Object)sourceValue.get("file").asText());
        source.put("pos", (Object)sourceValue.get("pos").asLong());
        source.put("row", (Object)sourceValue.get("row").asInt());
        source.put("thread", (Object)sourceValue.get("thread").asLong());
        source.put("query", sourceValue.get("query") == null ? null : sourceValue.get("query").asText());
        return source;
    }
}

