/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class KafkaCanalSyncDatabaseActionITCase
extends KafkaActionITCaseBase {
    @Test
    @Timeout(value=60L)
    public void testSchemaEvolutionMultiTopic() throws Exception {
        String topic1 = "schema_evolution_0";
        String topic2 = "schema_evolution_1";
        String topic3 = "schema_evolution_2";
        boolean writeOne = false;
        int fileCount = 3;
        List<String> topics = Arrays.asList("schema_evolution_0", "schema_evolution_1", "schema_evolution_2");
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(topics.get(i), KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl(topics, writeOne, fileCount);
    }

    @Test
    @Timeout(value=60L)
    public void testSchemaEvolutionOneTopic() throws Exception {
        String topic = "schema_evolution";
        boolean writeOne = true;
        int fileCount = 3;
        List<String> topics = Collections.singletonList("schema_evolution");
        topics.forEach(t -> this.createTestTopic((String)t, 1, 1));
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(topics.get(0), KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl(topics, writeOne, fileCount);
    }

    private void testSchemaEvolutionImpl(List<String> topics, boolean writeOne, int fileCount) throws Exception {
        int i;
        this.waitingTables("t1", "t2");
        FileStoreTable table1 = this.getFileStoreTable("t1");
        FileStoreTable table2 = this.getFileStoreTable("t2");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
        List<String> primaryKeys1 = Collections.singletonList("k");
        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"k1", "k2", "v1", "v2"});
        List<String> primaryKeys2 = Arrays.asList("k1", "k2");
        expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        for (i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(writeOne ? topics.get(0) : topics.get(i), KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-2.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        }
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"k", "v1", "v2"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k1", "k2", "v1", "v2", "v3"});
        expected = Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        for (i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(writeOne ? topics.get(0) : topics.get(i), KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-3.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        }
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.BIGINT()}, (String[])new String[]{"k", "v1", "v2"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR((int)20)}, (String[])new String[]{"k1", "k2", "v1", "v2", "v3"});
        expected = Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]", "+I[10, ten, 100, 1000, long_long_string_10]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
    }

    @Test
    public void testTopicIsEmpty() {
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        KafkaSyncDatabaseAction action = this.syncDatabaseActionBuilder(kafkaConfig).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncDatabaseAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, (String)"kafka-conf [topic] must be specified.")});
    }

    @Test
    @Timeout(value=60L)
    public void testTableAffixMultiTopic() throws Exception {
        this.createFileStoreTable("test_prefix_t1_test_suffix", RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k1", "v0"}), Collections.emptyList(), Collections.singletonList("k1"), Collections.emptyMap());
        String topic1 = "prefix_suffix_0";
        String topic2 = "prefix_suffix_1";
        boolean writeOne = false;
        int fileCount = 2;
        List<String> topics = Arrays.asList("prefix_suffix_0", "prefix_suffix_1");
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        for (int i = 0; i < topics.size(); ++i) {
            try {
                this.writeRecordsToKafka(topics.get(i), KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/prefixsuffix/topic" + i + "/canal-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").withTableConfig(this.getBasicTableConfig()).includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testTableAffixImpl(topics, writeOne, fileCount);
    }

    @Test
    @Timeout(value=60L)
    public void testTableAffixOneTopic() throws Exception {
        this.createFileStoreTable("test_prefix_t1_test_suffix", RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k1", "v0"}), Collections.emptyList(), Collections.singletonList("k1"), Collections.emptyMap());
        String topic1 = "prefix_suffix";
        List<String> topics = Collections.singletonList("prefix_suffix");
        boolean writeOne = true;
        int fileCount = 2;
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(topics.get(0), KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/prefixsuffix/topic" + i + "/canal-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").withTableConfig(this.getBasicTableConfig()).includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testTableAffixImpl(topics, writeOne, fileCount);
    }

    private void testTableAffixImpl(List<String> topics, boolean writeOne, int fileCount) throws Exception {
        int i;
        this.waitingTables("test_prefix_t1_test_suffix", "test_prefix_t2_test_suffix");
        FileStoreTable table1 = this.getFileStoreTable("test_prefix_t1_test_suffix");
        FileStoreTable table2 = this.getFileStoreTable("test_prefix_t2_test_suffix");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k1", "v0"});
        List<String> primaryKeys1 = Collections.singletonList("k1");
        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k2", "v0"});
        List<String> primaryKeys2 = Collections.singletonList("k2");
        expected = Arrays.asList("+I[2, two]", "+I[4, four]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        for (i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(writeOne ? topics.get(0) : topics.get(i), KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/prefixsuffix/topic" + i + "/canal-data-2.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        }
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"k1", "v0", "v1"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k2", "v0", "v1"});
        expected = Arrays.asList("+I[2, two, NULL]", "+I[4, four, NULL]", "+I[6, six, s_6]", "+I[8, eight, s_8]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        for (i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(writeOne ? topics.get(0) : topics.get(i), KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/prefixsuffix/topic" + i + "/canal-data-3.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        }
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.BIGINT()}, (String[])new String[]{"k1", "v0", "v1"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.VARCHAR((int)20)}, (String[])new String[]{"k2", "v0", "v1"});
        expected = Arrays.asList("+I[2, two, NULL]", "+I[4, four, NULL]", "+I[6, six, s_6]", "+I[8, eight, s_8]", "+I[10, ten, long_s_10]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
    }

    @Test
    @Timeout(value=60L)
    public void testIncludingTables() throws Exception {
        this.includingAndExcludingTablesImpl("flink|paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignore"));
    }

    @Test
    @Timeout(value=60L)
    public void testExcludingTables() throws Exception {
        this.includingAndExcludingTablesImpl(null, "flink|paimon.+", Collections.singletonList("ignore"), Arrays.asList("flink", "paimon_1", "paimon_2"));
    }

    @Test
    @Timeout(value=60L)
    public void testIncludingAndExcludingTables() throws Exception {
        this.includingAndExcludingTablesImpl("flink|paimon.+", "paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "ignore"));
    }

    private void includingAndExcludingTablesImpl(@Nullable String includingTables, @Nullable String excludingTables, List<String> existedTables, List<String> notExistedTables) throws Exception {
        String topic1 = "include_exclude" + UUID.randomUUID();
        List<String> topics = Collections.singletonList(topic1);
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        try {
            this.writeRecordsToKafka(topics.get(0), KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/include/topic0/canal-data-1.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).includingTables(includingTables).excludingTables(excludingTables).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.waitingTables(existedTables);
        this.assertTableNotExists(notExistedTables);
    }

    @Test
    @Timeout(value=60L)
    public void testTypeMappingToString() throws Exception {
        String topic = "map-to-string";
        this.createTestTopic("map-to-string", 1, 1);
        this.writeRecordsToKafka("map-to-string", KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/tostring/canal-data-1.txt"));
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "map-to-string");
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).withTypeMappingModes(TypeMapping.TypeMappingMode.TO_STRING.configString()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.waitingTables("t1");
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"k1", "v0", "v1"});
        this.waitForResult(Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"), this.getFileStoreTable("t1"), rowType, Collections.singletonList("k1"));
    }

    @Test
    public void testCatalogAndTableConfig() {
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(this.getBasicKafkaConfig()).withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value")).withTableConfig(Collections.singletonMap("table-key", "table-value")).build();
        Assertions.assertThat((Map)action.catalogConfig()).containsEntry((Object)"catalog-key", (Object)"catalog-value");
        Assertions.assertThat((Map)action.tableConfig()).containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
    }

    @Test
    @Timeout(value=60L)
    public void testCaseInsensitive() throws Exception {
        String topic = "case-insensitive";
        this.createTestTopic("case-insensitive", 1, 1);
        this.writeRecordsToKafka("case-insensitive", KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/case-insensitive/canal-data-1.txt"));
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "case-insensitive");
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).withCatalogConfig(Collections.singletonMap(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.waitingTables("t1");
        FileStoreTable table = this.getFileStoreTable("t1");
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"k1", "v0", "v1"});
        this.waitForResult(Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"), table, rowType, Collections.singletonList("k1"));
    }

    @Test
    @Timeout(value=60L)
    public void testCannotSynchronizeIncompleteJson() throws Exception {
        String topic = "incomplete";
        this.createTestTopic("incomplete", 1, 1);
        this.writeRecordsToKafka("incomplete", KafkaCanalSyncDatabaseActionITCase.readLines("kafka/canal/database/incomplete/canal-data-1.txt"));
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "incomplete");
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        action.withStreamExecutionEnvironment(this.env).build();
        Assertions.assertThatThrownBy(() -> this.env.execute()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, (String)"Cannot synchronize record when database name or table name is unknown. Invalid record is:\n{databaseName=null, tableName=null, fieldTypes={k=STRING, v0=STRING, v1=STRING}, primaryKeys=[], cdcRecord=+I {v0=five, k=5, v1=50}}")});
    }
}

