/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableAction;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableActionITCase;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class KafkaJsonSyncTableActionITCase
extends KafkaSyncTableActionITCase {
    @Test
    @Timeout(value=60L)
    public void testSchemaEvolution() throws Exception {
        String topic = "schema-evolution";
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        this.createTestTopic(topic, 1, 1);
        this.writeRecordsToKafka(topic, "kafka/json/table/schemaevolution/json-data-1.txt", new Object[0]);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).withTableConfig(tableOptions).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"a", "b", "event_tm"});
        this.waitForResult(Arrays.asList("+I[a1, b1, 2024-05-22 09:50:40]", "+I[a2, b2, 2024-05-23 10:20:56]"), this.getFileStoreTable(this.tableName), rowType, Collections.emptyList());
        this.writeRecordsToKafka(topic, "kafka/json/table/schemaevolution/json-data-2.txt", "schemaevolution");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"a", "b", "event_tm", "c"});
        this.waitForResult(Arrays.asList("+I[a1, b1, 2024-05-22 09:50:40, NULL]", "+I[a2, b2, 2024-05-23 10:20:56, NULL]", "+I[a3, b3, 2024-05-22 19:50:40, NULL]", "+I[a4, b4, 2024-05-23 15:20:56, c4]"), this.getFileStoreTable(this.tableName), rowType, Collections.emptyList());
    }

    @Test
    @Timeout(value=60L)
    public void testComputedColumn() throws Exception {
        String topic = "computed_column";
        HashMap<String, String> tableOptions = new HashMap<String, String>();
        this.createTestTopic(topic, 1, 1);
        this.writeRecordsToKafka(topic, "kafka/json/table/computedcolumn/json-data-1.txt", new Object[0]);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).withPartitionKeys("pt").withTableConfig(tableOptions).withComputedColumnArgs("pt=date_format(event_tm, yyyyMMdd)").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"a", "b", "event_tm", "pt"});
        this.waitForResult(Arrays.asList("+I[a1, b1, 2024-05-20 20:50:30, 20240520]", "+I[a2, b2, 2024-05-21 18:10:46, 20240521]"), this.getFileStoreTable(this.tableName), rowType, Collections.emptyList());
    }
}

