/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class KafkaMaxwellSyncDatabaseActionITCase
extends KafkaActionITCaseBase {
    @Test
    @Timeout(value=60L)
    public void testSchemaEvolutionMultiTopic() throws Exception {
        String topic1 = "schema_evolution_0";
        String topic2 = "schema_evolution_1";
        boolean writeOne = false;
        int fileCount = 2;
        List<String> topics = Arrays.asList("schema_evolution_0", "schema_evolution_1");
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(topics.get(i), KafkaMaxwellSyncDatabaseActionITCase.readLines("kafka/maxwell/database/schemaevolution/topic" + i + "/maxwell-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl(topics, writeOne, fileCount);
    }

    @Test
    @Timeout(value=60L)
    public void testSchemaEvolutionOneTopic() throws Exception {
        String topic = "schema_evolution";
        boolean writeOne = true;
        int fileCount = 2;
        List<String> topics = Collections.singletonList("schema_evolution");
        topics.forEach(t -> this.createTestTopic((String)t, 1, 1));
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(topics.get(0), KafkaMaxwellSyncDatabaseActionITCase.readLines("kafka/maxwell/database/schemaevolution/topic" + i + "/maxwell-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl(topics, writeOne, fileCount);
    }

    private void testSchemaEvolutionImpl(List<String> topics, boolean writeOne, int fileCount) throws Exception {
        this.waitingTables("t1", "t2");
        FileStoreTable table1 = this.getFileStoreTable("t1");
        FileStoreTable table2 = this.getFileStoreTable("t2");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys1 = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys2 = Collections.singletonList("id");
        List<String> expected2 = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected2, table2, rowType2, primaryKeys2);
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(writeOne ? topics.get(0) : topics.get(i), KafkaMaxwellSyncDatabaseActionITCase.readLines("kafka/maxwell/database/schemaevolution/topic" + i + "/maxwell-data-2.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[102, car battery, 12V car battery, 8.1, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "address"});
        expected = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, Beijing]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, Shanghai]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
    }

    @Test
    public void testTopicIsEmpty() {
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        KafkaSyncDatabaseAction action = this.syncDatabaseActionBuilder(kafkaConfig).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncDatabaseAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, (String)"kafka-conf [topic] must be specified.")});
    }

    @Test
    @Timeout(value=60L)
    public void testTableAffixMultiTopic() throws Exception {
        this.createFileStoreTable("test_prefix_t1_test_suffix", RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"}), Collections.emptyList(), Collections.singletonList("id"), Collections.emptyMap());
        String topic1 = "prefix_suffix_0";
        String topic2 = "prefix_suffix_1";
        boolean writeOne = false;
        int fileCount = 2;
        List<String> topics = Arrays.asList("prefix_suffix_0", "prefix_suffix_1");
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        for (int i = 0; i < topics.size(); ++i) {
            try {
                this.writeRecordsToKafka(topics.get(i), KafkaMaxwellSyncDatabaseActionITCase.readLines("kafka/maxwell/database/prefixsuffix/topic" + i + "/maxwell-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").withTableConfig(this.getBasicTableConfig()).includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testTableAffixImpl(topics, writeOne, fileCount);
    }

    @Test
    @Timeout(value=60L)
    public void testTableAffixOneTopic() throws Exception {
        this.createFileStoreTable("test_prefix_t1_test_suffix", RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"}), Collections.emptyList(), Collections.singletonList("id"), Collections.emptyMap());
        String topic1 = "prefix_suffix";
        List<String> topics = Collections.singletonList("prefix_suffix");
        boolean writeOne = true;
        int fileCount = 2;
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(topics.get(0), KafkaMaxwellSyncDatabaseActionITCase.readLines("kafka/maxwell/database/prefixsuffix/topic" + i + "/maxwell-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").withTableConfig(this.getBasicTableConfig()).includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testTableAffixImpl(topics, writeOne, fileCount);
    }

    private void testTableAffixImpl(List<String> topics, boolean writeOne, int fileCount) throws Exception {
        this.waitingTables("test_prefix_t1_test_suffix", "test_prefix_t2_test_suffix");
        FileStoreTable table1 = this.getFileStoreTable("test_prefix_t1_test_suffix");
        FileStoreTable table2 = this.getFileStoreTable("test_prefix_t2_test_suffix");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys1 = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys2 = Collections.singletonList("id");
        expected = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.writeRecordsToKafka(writeOne ? topics.get(0) : topics.get(i), KafkaMaxwellSyncDatabaseActionITCase.readLines("kafka/maxwell/database/prefixsuffix/topic" + i + "/maxwell-data-2.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write maxwell data to Kafka.", e);
            }
        }
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "address"});
        expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, Beijing]", "+I[102, car battery, 12V car battery, 8.1, Shanghai]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
    }

    @Test
    @Timeout(value=60L)
    public void testIncludingTables() throws Exception {
        this.includingAndExcludingTablesImpl("flink|paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignore"));
    }

    @Test
    @Timeout(value=60L)
    public void testExcludingTables() throws Exception {
        this.includingAndExcludingTablesImpl(null, "flink|paimon.+", Collections.singletonList("ignore"), Arrays.asList("flink", "paimon_1", "paimon_2"));
    }

    @Test
    @Timeout(value=60L)
    public void testIncludingAndExcludingTables() throws Exception {
        this.includingAndExcludingTablesImpl("flink|paimon.+", "paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "ignore"));
    }

    private void includingAndExcludingTablesImpl(@Nullable String includingTables, @Nullable String excludingTables, List<String> existedTables, List<String> notExistedTables) throws Exception {
        String topic1 = "include_exclude" + UUID.randomUUID();
        List<String> topics = Collections.singletonList(topic1);
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        try {
            this.writeRecordsToKafka(topics.get(0), KafkaMaxwellSyncDatabaseActionITCase.readLines("kafka/maxwell/database/include/topic0/maxwell-data-1.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write maxwell data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "maxwell-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).includingTables(includingTables).excludingTables(excludingTables).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.waitingTables(existedTables);
        this.assertTableNotExists(notExistedTables);
    }
}

