/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class KafkaDebeziumWithSchemaSyncTableActionITCase
extends KafkaActionITCaseBase {
    @Test
    @Timeout(value=300L)
    public void testSchemaEvolution() throws Exception {
        this.runSingleTableSchemaEvolution("schemaevolution");
    }

    private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
        String topic = "schema_evolution";
        this.createTestTopic("schema_evolution", 1, 1);
        List<String> lines = KafkaDebeziumWithSchemaSyncTableActionITCase.readLines(String.format("kafka/debezium/table/schema/%s/debezium-data-1.txt", sourceDir));
        try {
            this.writeRecordsToKafka("schema_evolution", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write debezium data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "debezium-json");
        kafkaConfig.put("value.debezium-json.schema-include", "true");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_evolution");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl("schema_evolution", sourceDir);
    }

    private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception {
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaDebeziumWithSchemaSyncTableActionITCase.readLines(String.format("kafka/debezium/table/schema/%s/debezium-data-2.txt", sourceDir)));
        }
        catch (Exception e) {
            throw new Exception("Failed to write debezium data to Kafka.", e);
        }
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[102, car battery, 12V car battery, 8.1, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaDebeziumWithSchemaSyncTableActionITCase.readLines(String.format("kafka/debezium/table/schema/%s/debezium-data-3.txt", sourceDir)));
        }
        catch (Exception e) {
            throw new Exception("Failed to write debezium data to Kafka.", e);
        }
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age", "address"});
        expected = Arrays.asList("+I[102, car battery, 12V car battery, 8.1, NULL, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", "+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testComputedColumn() throws Exception {
        String topic = "computed_column";
        this.createTestTopic(topic, 1, 1);
        List<String> lines = KafkaDebeziumWithSchemaSyncTableActionITCase.readLines("kafka/debezium/table/schema/computedcolumn/debezium-data-1.txt");
        try {
            this.writeRecordsToKafka(topic, lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "debezium-json");
        kafkaConfig.put("value.debezium-json.schema-include", "true");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withComputedColumnArgs("_year=year(date)").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.INT()}, (String[])new String[]{"id", "date", "_year"});
        this.waitForResult(Collections.singletonList("+I[101, 2023-03-23, 2023]"), this.getFileStoreTable(this.tableName), rowType, Collections.singletonList("id"));
    }
}

