/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.pulsar;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionITCaseBase;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncDatabaseAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class PulsarSyncDatabaseActionITCase
extends PulsarActionITCaseBase {
    @Test
    @Timeout(value=60L)
    public void testSchemaEvolutionMultiTopic() throws Exception {
        String topic1 = "schema_evolution_0";
        String topic2 = "schema_evolution_1";
        String topic3 = "schema_evolution_2";
        boolean writeOne = false;
        int fileCount = 3;
        this.topics = Arrays.asList("schema_evolution_0", "schema_evolution_1", "schema_evolution_2");
        this.topics.forEach(topic -> this.createTopic((String)topic, 1));
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.sendMessages((String)this.topics.get(i), this.getMessages("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Pulsar.", e);
            }
        }
        Map<String, String> pulsarConfig = this.getBasicPulsarConfig();
        pulsarConfig.put(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
        pulsarConfig.put(PulsarActionUtils.VALUE_FORMAT.key(), "canal-json");
        if (ThreadLocalRandom.current().nextBoolean()) {
            pulsarConfig.put(PulsarActionUtils.TOPIC.key(), String.join((CharSequence)";", this.topics));
        } else {
            pulsarConfig.put(PulsarActionUtils.TOPIC_PATTERN.key(), "schema_evolution_.+");
        }
        PulsarSyncDatabaseAction action = (PulsarSyncDatabaseAction)this.syncDatabaseActionBuilder(pulsarConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl(this.topics, writeOne, fileCount);
    }

    @Test
    @Timeout(value=60L)
    public void testSchemaEvolutionOneTopic() throws Exception {
        String topic = "schema_evolution";
        boolean writeOne = true;
        int fileCount = 3;
        this.topics = Collections.singletonList("schema_evolution");
        this.topics.forEach(t -> this.createTopic((String)t, 1));
        for (int i = 0; i < fileCount; ++i) {
            try {
                this.sendMessages((String)this.topics.get(0), this.getMessages("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-1.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Pulsar.", e);
            }
        }
        Map<String, String> pulsarConfig = this.getBasicPulsarConfig();
        pulsarConfig.put(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
        pulsarConfig.put(PulsarActionUtils.VALUE_FORMAT.key(), "canal-json");
        pulsarConfig.put(PulsarActionUtils.TOPIC.key(), String.join((CharSequence)";", this.topics));
        PulsarSyncDatabaseAction action = (PulsarSyncDatabaseAction)this.syncDatabaseActionBuilder(pulsarConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl(this.topics, writeOne, fileCount);
    }

    private void testSchemaEvolutionImpl(List<String> topics, boolean writeOne, int fileCount) throws Exception {
        int i;
        this.waitingTables("t1", "t2");
        FileStoreTable table1 = this.getFileStoreTable("t1");
        FileStoreTable table2 = this.getFileStoreTable("t2");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k", "v1"});
        List<String> primaryKeys1 = Collections.singletonList("k");
        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"k1", "k2", "v1", "v2"});
        List<String> primaryKeys2 = Arrays.asList("k1", "k2");
        expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        for (i = 0; i < fileCount; ++i) {
            try {
                this.sendMessages(writeOne ? topics.get(0) : topics.get(i), this.getMessages("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-2.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Pulsar.", e);
            }
        }
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"k", "v1", "v2"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"k1", "k2", "v1", "v2", "v3"});
        expected = Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
        for (i = 0; i < fileCount; ++i) {
            try {
                this.sendMessages(writeOne ? topics.get(0) : topics.get(i), this.getMessages("kafka/canal/database/schemaevolution/topic" + i + "/canal-data-3.txt"));
                continue;
            }
            catch (Exception e) {
                throw new Exception("Failed to write canal data to Pulsar.", e);
            }
        }
        rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.BIGINT()}, (String[])new String[]{"k", "v1", "v2"});
        expected = Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]");
        this.waitForResult(expected, table1, rowType1, primaryKeys1);
        rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR((int)20)}, (String[])new String[]{"k1", "k2", "v1", "v2", "v3"});
        expected = Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]", "+I[10, ten, 100, 1000, long_long_string_10]");
        this.waitForResult(expected, table2, rowType2, primaryKeys2);
    }
}

