/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.debezium;

import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumBsonDataFormatFactory;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumBsonRecordParser;
import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.StringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class DebeziumBsonRecordParserTest {
    private static final List<CdcSourceRecord> insertList = new ArrayList<CdcSourceRecord>();
    private static final List<CdcSourceRecord> updateList = new ArrayList<CdcSourceRecord>();
    private static final List<CdcSourceRecord> deleteList = new ArrayList<CdcSourceRecord>();
    private static final ArrayList<CdcSourceRecord> bsonRecords = new ArrayList();
    private static final ArrayList<CdcSourceRecord> jsonRecords = new ArrayList();
    private static final Map<String, String> keyEvent = new HashMap<String, String>();
    private static KafkaDeserializationSchema<CdcSourceRecord> kafkaDeserializationSchema = null;
    private static final Map<String, String> beforeEvent = new HashMap<String, String>();
    private static final Map<String, String> afterEvent = new HashMap<String, String>();

    @BeforeAll
    public static void beforeAll() throws Exception {
        DataFormat dataFormat = new DebeziumBsonDataFormatFactory().create();
        kafkaDeserializationSchema = dataFormat.createKafkaDeserializer(null);
        keyEvent.put("_id", "67ab25755c0d5ac87eb8c632");
        beforeEvent.put("_id", "67ab25755c0d5ac87eb8c632");
        beforeEvent.put("created_at", "1736207571013");
        beforeEvent.put("created_by", "peter");
        beforeEvent.put("tags", "[\"pending\"]");
        beforeEvent.put("updated_at", "1739455297970");
        afterEvent.put("_id", "67ab25755c0d5ac87eb8c632");
        afterEvent.put("created_at", "1736207571013");
        afterEvent.put("created_by", "peter");
        afterEvent.put("tags", "[\"succeed\"]");
        afterEvent.put("updated_at", "1739455397970");
        String insertRes = "kafka/debezium-bson/table/event/event-insert.txt";
        String updateRes = "kafka/debezium-bson/table/event/event-update.txt";
        String deleteRes = "kafka/debezium-bson/table/event/event-delete.txt";
        String bsonPth = "kafka/debezium-bson/table/event/event-bson.txt";
        String jsonPath = "kafka/debezium-bson/table/event/event-json.txt";
        DebeziumBsonRecordParserTest.parseCdcSourceRecords(insertRes, insertList);
        DebeziumBsonRecordParserTest.parseCdcSourceRecords(updateRes, updateList);
        DebeziumBsonRecordParserTest.parseCdcSourceRecords(deleteRes, deleteList);
        DebeziumBsonRecordParserTest.parseCdcSourceRecords(bsonPth, bsonRecords);
        DebeziumBsonRecordParserTest.parseCdcSourceRecords(jsonPath, jsonRecords);
    }

    @AfterAll
    public static void afterAll() {
        insertList.clear();
        updateList.clear();
        deleteList.clear();
        bsonRecords.clear();
        jsonRecords.clear();
    }

    private static void parseCdcSourceRecords(String resourcePath, List<CdcSourceRecord> records) throws Exception {
        URL url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(resourcePath);
        List<String> line = Files.readAllLines(Paths.get(url.toURI()));
        String key = null;
        for (String json : line) {
            if (StringUtils.isNullOrWhitespaceOnly((String)json) || !json.startsWith("{")) continue;
            if (key == null) {
                key = json;
                continue;
            }
            records.add(DebeziumBsonRecordParserTest.deserializeKafkaSchema(key, json));
            key = null;
        }
    }

    @Test
    public void extractInsertRecord() throws Exception {
        DebeziumBsonRecordParser parser = new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
        Assertions.assertFalse((boolean)insertList.isEmpty());
        for (CdcSourceRecord cdcRecord : insertList) {
            Schema schema = parser.buildSchema(cdcRecord);
            Assertions.assertEquals((Object)schema.primaryKeys(), Arrays.asList("_id"));
            List records = parser.extractRecords();
            Assertions.assertEquals((int)records.size(), (int)1);
            CdcRecord result = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord();
            Assertions.assertEquals((Object)result.kind(), (Object)RowKind.INSERT);
            Assertions.assertEquals(beforeEvent, (Object)result.data());
            String dbName = parser.getDatabaseName();
            Assertions.assertEquals((Object)dbName, (Object)"bigdata_test");
            String tableName = parser.getTableName();
            Assertions.assertEquals((Object)tableName, (Object)"sync_test_table");
            MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
            Assertions.assertTrue((extractor.extractTimestamp(cdcRecord) > 0L ? 1 : 0) != 0);
        }
    }

    @Test
    public void extractUpdateRecord() throws Exception {
        DebeziumBsonRecordParser parser = new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
        Assertions.assertFalse((boolean)updateList.isEmpty());
        for (CdcSourceRecord cdcRecord : updateList) {
            Schema schema = parser.buildSchema(cdcRecord);
            Assertions.assertEquals((Object)schema.primaryKeys(), Arrays.asList("_id"));
            List records = parser.extractRecords();
            Assertions.assertEquals((int)records.size(), (int)2);
            CdcRecord updateBefore = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord();
            Assertions.assertEquals((Object)updateBefore.kind(), (Object)RowKind.DELETE);
            if (parser.checkBeforeExists()) {
                Assertions.assertEquals(beforeEvent, (Object)updateBefore.data());
            } else {
                Assertions.assertEquals(keyEvent, (Object)updateBefore.data());
            }
            CdcRecord updateAfter = ((RichCdcMultiplexRecord)records.get(1)).toRichCdcRecord().toCdcRecord();
            Assertions.assertEquals((Object)updateAfter.kind(), (Object)RowKind.INSERT);
            Assertions.assertEquals(afterEvent, (Object)updateAfter.data());
            String dbName = parser.getDatabaseName();
            Assertions.assertEquals((Object)dbName, (Object)"bigdata_test");
            String tableName = parser.getTableName();
            Assertions.assertEquals((Object)tableName, (Object)"sync_test_table");
            MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
            Assertions.assertTrue((extractor.extractTimestamp(cdcRecord) > 0L ? 1 : 0) != 0);
        }
    }

    @Test
    public void extractDeleteRecord() throws Exception {
        DebeziumBsonRecordParser parser = new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
        Assertions.assertFalse((boolean)deleteList.isEmpty());
        for (CdcSourceRecord cdcRecord : deleteList) {
            Schema schema = parser.buildSchema(cdcRecord);
            Assertions.assertEquals((Object)schema.primaryKeys(), Arrays.asList("_id"));
            List records = parser.extractRecords();
            Assertions.assertEquals((int)records.size(), (int)1);
            CdcRecord result = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord();
            Assertions.assertEquals((Object)result.kind(), (Object)RowKind.DELETE);
            if (parser.checkBeforeExists()) {
                Assertions.assertEquals(beforeEvent, (Object)result.data());
            } else {
                Assertions.assertEquals(keyEvent, (Object)result.data());
            }
            String dbName = parser.getDatabaseName();
            Assertions.assertEquals((Object)dbName, (Object)"bigdata_test");
            String tableName = parser.getTableName();
            Assertions.assertEquals((Object)tableName, (Object)"sync_test_table");
            MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
            Assertions.assertTrue((extractor.extractTimestamp(cdcRecord) > 0L ? 1 : 0) != 0);
        }
    }

    @Test
    public void bsonConvertJsonTest() throws Exception {
        DebeziumBsonRecordParser parser = new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
        Assertions.assertFalse((boolean)jsonRecords.isEmpty());
        for (int i = 0; i < jsonRecords.size(); ++i) {
            CdcSourceRecord bsonRecord = bsonRecords.get(i);
            CdcSourceRecord jsonRecord = jsonRecords.get(i);
            TextNode bsonTextNode = new TextNode(JsonSerdeUtil.writeValueAsString((Object)bsonRecord.getValue()));
            Map resultMap = parser.extractRowData((JsonNode)bsonTextNode, CdcSchema.newBuilder());
            ObjectNode expectNode = (ObjectNode)jsonRecord.getValue();
            expectNode.fields().forEachRemaining(entry -> {
                String key = (String)entry.getKey();
                String expectValue = null;
                if (!JsonSerdeUtil.isNull((JsonNode)((JsonNode)entry.getValue()))) {
                    expectValue = ((JsonNode)entry.getValue()).asText();
                }
                Assertions.assertEquals((Object)expectValue, resultMap.get(key));
            });
        }
    }

    private static CdcSourceRecord deserializeKafkaSchema(String key, String value) throws Exception {
        return (CdcSourceRecord)kafkaDeserializationSchema.deserialize(new ConsumerRecord("topic", 0, 0L, (Object)key.getBytes(), (Object)value.getBytes()));
    }
}

