/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class KafkaOggSyncTableActionITCase
extends KafkaActionITCaseBase {
    @Test
    @Timeout(value=60L)
    public void testSchemaEvolution() throws Exception {
        this.runSingleTableSchemaEvolution("schemaevolution");
    }

    private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
        String topic = "schema_evolution";
        this.createTestTopic("schema_evolution", 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines(String.format("kafka/ogg/table/%s/ogg-data-1.txt", sourceDir));
        try {
            this.writeRecordsToKafka("schema_evolution", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_evolution");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl("schema_evolution", sourceDir);
    }

    private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception {
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaOggSyncTableActionITCase.readLines(String.format("kafka/ogg/table/%s/ogg-data-2.txt", sourceDir)));
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175, NULL]", "+I[102, car battery, 12V car battery, 8.100000381469727, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929, 18]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaOggSyncTableActionITCase.readLines(String.format("kafka/ogg/table/%s/ogg-data-3.txt", sourceDir)));
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age", "address"});
        expected = Arrays.asList("+I[102, car battery, 12V car battery, 8.100000381469727, NULL, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929, 18, NULL]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", "+I[107, rocks, box of assorted rocks, 5.300000190734863, NULL, NULL]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testNotSupportFormat() throws Exception {
        String topic = "not_support";
        this.createTestTopic("not_support", 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/schemaevolution/ogg-data-1.txt");
        try {
            this.writeRecordsToKafka("not_support", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "togg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "not_support");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(UnsupportedOperationException.class, (String)"This format: togg-json is not supported.")});
    }

    @Test
    @Timeout(value=60L)
    public void testAssertSchemaCompatible() throws Exception {
        String topic = "assert_schema_compatible";
        this.createTestTopic("assert_schema_compatible", 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/schemaevolution/ogg-data-1.txt");
        try {
            this.writeRecordsToKafka("assert_schema_compatible", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "assert_schema_compatible");
        this.createFileStoreTable(RowType.of((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"k", "v1"}), Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap());
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, (String)"Paimon schema and source table schema are not compatible.\nPaimon fields are: [`k` STRING NOT NULL, `v1` STRING].\nSource table fields are: [`id` STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]")});
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionSpecific() throws Exception {
        String topic = "start_up_specific";
        this.createTestTopic("start_up_specific", 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/startupmode/ogg-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_specific", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_specific");
        kafkaConfig.put("scan.startup.mode", "specific-offsets");
        kafkaConfig.put("scan.startup.specific-offsets", "partition:0,offset:1");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Collections.singletonList("+I[102, car battery, 12V car battery, 8.100000381469727]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionLatest() throws Exception {
        String topic = "start_up_latest";
        this.createTestTopic("start_up_latest", 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/startupmode/ogg-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_latest", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_latest");
        kafkaConfig.put("scan.startup.mode", "latest-offset");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        Thread.sleep(5000L);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        try {
            this.writeRecordsToKafka("start_up_latest", KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionTimestamp() throws Exception {
        String topic = "start_up_timestamp";
        this.createTestTopic("start_up_timestamp", 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/startupmode/ogg-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_timestamp", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_timestamp");
        kafkaConfig.put("scan.startup.mode", "timestamp");
        kafkaConfig.put("scan.startup.timestamp-millis", String.valueOf(System.currentTimeMillis()));
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try {
            this.writeRecordsToKafka("start_up_timestamp", KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionEarliest() throws Exception {
        String topic = "start_up_earliest";
        this.createTestTopic("start_up_earliest", 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/startupmode/ogg-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_earliest", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_earliest");
        kafkaConfig.put("scan.startup.mode", "earliest-offset");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try {
            this.writeRecordsToKafka("start_up_earliest", KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testStarUpOptionGroup() throws Exception {
        String topic = "start_up_group";
        this.createTestTopic("start_up_group", 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/startupmode/ogg-data-1.txt");
        try {
            this.writeRecordsToKafka("start_up_group", lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_group");
        kafkaConfig.put("scan.startup.mode", "group-offsets");
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try {
            this.writeRecordsToKafka("start_up_group", KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/startupmode/ogg-data-2.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write ogg data to Kafka.", e);
        }
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> primaryKeys = Collections.singletonList("id");
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800000011920929]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testComputedColumn() throws Exception {
        String topic = "computed_column";
        this.createTestTopic(topic, 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/computedcolumn/ogg-data-1.txt");
        try {
            this.writeRecordsToKafka(topic, lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPartitionKeys("_year").withPrimaryKeys("_id", "_year").withComputedColumnArgs("_year=year(_date)").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.INT().notNull()}, (String[])new String[]{"_id", "_date", "_year"});
        this.waitForResult(Collections.singletonList("+I[101, 2023-03-23, 2023]"), this.getFileStoreTable(this.tableName), rowType, Arrays.asList("_id", "_year"));
    }

    @Test
    @Timeout(value=60L)
    public void testCDCOperations() throws Exception {
        String topic = "event";
        this.createTestTopic(topic, 1, 1);
        List<String> lines = KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/event/event-insert.txt");
        try {
            this.writeRecordsToKafka(topic, lines);
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), topic);
        KafkaSyncTableAction action = (KafkaSyncTableAction)this.syncTableActionBuilder(kafkaConfig).withPrimaryKeys("id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        List<String> primaryKeys = Collections.singletonList("id");
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> expectedInsert = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, scooter, Big 2-wheel scooter , 5.179999828338623]");
        this.waitForResult(expectedInsert, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/event/event-update.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        List<String> expectedUpdate = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]", "+I[103, scooter, Big 2-wheel scooter , 8.170000076293945]");
        this.waitForResult(expectedUpdate, table, rowType, primaryKeys);
        try {
            this.writeRecordsToKafka(topic, KafkaOggSyncTableActionITCase.readLines("kafka/ogg/table/event/event-delete.txt"));
        }
        catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
        List<String> expectedReplace = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", "+I[102, car battery, 12V car battery, 8.100000381469727]");
        this.waitForResult(expectedReplace, table, rowType, primaryKeys);
    }
}

