/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableAction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class KafkaSchemaITCase
extends KafkaActionITCaseBase {
    @Test
    @Timeout(value=60L)
    public void testKafkaSchema() throws Exception {
        String topic = "test_kafka_schema";
        this.createTestTopic("test_kafka_schema", 1, 1);
        List<String> lines = KafkaSchemaITCase.readLines("kafka/canal/table/schemaevolution/canal-data-1.txt");
        try {
            this.writeRecordsToKafka("test_kafka_schema", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Configuration kafkaConfig = Configuration.fromMap(this.getBasicKafkaConfig());
        kafkaConfig.setString(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.setString(KafkaConnectorOptions.TOPIC.key(), "test_kafka_schema");
        Schema kafkaSchema = MessageQueueSchemaUtils.getSchema((MessageQueueSchemaUtils.ConsumerWrapper)KafkaActionUtils.getKafkaEarliestConsumer((Configuration)kafkaConfig, (String)"test_kafka_schema"), (String)"test_kafka_schema", (DataFormat)KafkaActionUtils.getDataFormat((Configuration)kafkaConfig), (TypeMapping)TypeMapping.defaultMapping());
        ArrayList<DataField> fields = new ArrayList<DataField>();
        fields.add(new DataField(0, "pt", (DataType)DataTypes.INT()));
        fields.add(new DataField(1, "_id", DataTypes.INT().notNull()));
        fields.add(new DataField(2, "v1", (DataType)DataTypes.VARCHAR((int)10)));
        Assertions.assertThat((List)kafkaSchema.fields()).isEqualTo(fields);
    }

    @Test
    @Timeout(value=60L)
    public void testTableOptionsChange() throws Exception {
        String topic = "test_table_options_change";
        this.createTestTopic("test_table_options_change", 1, 1);
        this.writeRecordsToKafka("test_table_options_change", KafkaSchemaITCase.readLines("kafka/canal/table/optionschange/canal-data-1.txt"));
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "test_table_options_change");
        HashMap<String, String> tableConfig = new HashMap<String, String>();
        tableConfig.put("bucket", "1");
        tableConfig.put("sink.parallelism", "1");
        KafkaSyncTableAction action1 = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
        JobClient jobClient = this.runActionWithDefaultEnv((ActionBase)action1);
        this.waitingTables(this.tableName);
        jobClient.cancel();
        this.writeRecordsToKafka("test_table_options_change", KafkaSchemaITCase.readLines("kafka/canal/table/optionschange/canal-data-2.txt"));
        tableConfig.put("sink.savepoint.auto-tag", "true");
        tableConfig.put("tag.num-retained-max", "5");
        tableConfig.put("tag.automatic-creation", "process-time");
        tableConfig.put("tag.creation-period", "hourly");
        tableConfig.put("tag.creation-delay", "600000");
        tableConfig.put("snapshot.time-retained", "1h");
        tableConfig.put("snapshot.num-retained.min", "5");
        tableConfig.put("snapshot.num-retained.max", "10");
        tableConfig.put("changelog-producer", "input");
        KafkaSyncTableAction action2 = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
        this.runActionWithDefaultEnv((ActionBase)action2);
        Map dynamicOptions = action2.fileStoreTable().options();
        ((MapAssert)Assertions.assertThat((Map)dynamicOptions).containsAllEntriesOf(tableConfig)).containsKey((Object)"path");
    }

    @Test
    @Timeout(value=60L)
    public void testNewlyAddedTablesOptionsChange() throws Exception {
        String topic = "test_database_options_change";
        this.createTestTopic("test_database_options_change", 1, 1);
        this.writeRecordsToKafka("test_database_options_change", KafkaSchemaITCase.readLines("kafka/canal/database/schemaevolution/topic0/canal-data-1.txt"));
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "canal-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "test_database_options_change");
        HashMap<String, String> tableConfig = new HashMap<String, String>();
        tableConfig.put("bucket", "1");
        tableConfig.put("sink.parallelism", "1");
        KafkaSyncDatabaseAction action1 = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
        JobClient jobClient = this.runActionWithDefaultEnv((ActionBase)action1);
        this.waitingTables("t1");
        jobClient.cancel();
        tableConfig.put("sink.savepoint.auto-tag", "true");
        tableConfig.put("tag.num-retained-max", "5");
        tableConfig.put("tag.automatic-creation", "process-time");
        tableConfig.put("tag.creation-period", "hourly");
        tableConfig.put("tag.creation-delay", "600000");
        tableConfig.put("snapshot.time-retained", "1h");
        tableConfig.put("snapshot.num-retained.min", "5");
        tableConfig.put("snapshot.num-retained.max", "10");
        tableConfig.put("changelog-producer", "input");
        this.writeRecordsToKafka("test_database_options_change", KafkaSchemaITCase.readLines("kafka/canal/database/schemaevolution/topic1/canal-data-1.txt"));
        KafkaSyncDatabaseAction action2 = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
        this.runActionWithDefaultEnv((ActionBase)action2);
        this.waitingTables("t2");
        FileStoreTable table = this.getFileStoreTable("t2");
        Map tableOptions = table.options();
        Assertions.assertThat((Map)tableOptions).containsAllEntriesOf(tableConfig);
    }
}

