/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.aliyun;

import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.aliyun.AliyunRecordParser;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.BinaryStringUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AliyunJsonRecordParserTest
extends KafkaActionITCaseBase {
    private static final Logger log = LoggerFactory.getLogger(AliyunJsonRecordParserTest.class);
    private static List<String> insertList = new ArrayList<String>();
    private static List<String> updateList = new ArrayList<String>();
    private static List<String> deleteList = new ArrayList<String>();
    private static List<ComputedColumn> computedColumns = new ArrayList<ComputedColumn>();
    private static ObjectMapper objMapper = new ObjectMapper();
    String dateTimeRegex = "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}";

    @Before
    public void setup() {
        String insertRes = "kafka/aliyun/table/event/event-insert.txt";
        String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt";
        String deleteRes = "kafka/aliyun/table/event/event-delete.txt";
        String[] computedColumnArgs = new String[]{"etl_create_time=now()", "etl_update_time=now()"};
        try {
            URL url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes);
            Files.readAllLines(Paths.get(url.toURI())).stream().filter(this::isRecordLine).forEach(e -> insertList.add((String)e));
            url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(updateRes);
            Files.readAllLines(Paths.get(url.toURI())).stream().filter(this::isRecordLine).forEach(e -> updateList.add((String)e));
            url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(deleteRes);
            Files.readAllLines(Paths.get(url.toURI())).stream().filter(this::isRecordLine).forEach(e -> deleteList.add((String)e));
            computedColumns = ComputedColumnUtils.buildComputedColumns(Arrays.asList(computedColumnArgs), Collections.emptyList());
        }
        catch (Exception e2) {
            log.error("Fail to init aliyun-json cases", (Throwable)e2);
        }
    }

    @Test
    public void extractInsertRecord() throws Exception {
        AliyunRecordParser parser = new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
        for (String json : insertList) {
            JsonNode rootNode = (JsonNode)objMapper.readValue(json, JsonNode.class);
            CdcSourceRecord cdcRecord = new CdcSourceRecord((Object)rootNode);
            Schema schema = parser.buildSchema(cdcRecord);
            Assertions.assertEquals((Object)schema.primaryKeys(), Arrays.asList("id"));
            List records = parser.extractRecords();
            Assertions.assertEquals((int)records.size(), (int)1);
            CdcRecord result = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord();
            Assertions.assertEquals((Object)result.kind(), (Object)RowKind.INSERT);
            String dbName = parser.getDatabaseName();
            Assertions.assertEquals((Object)dbName, (Object)"bigdata_test");
            String tableName = parser.getTableName();
            Assertions.assertEquals((Object)tableName, (Object)"sync_test_table");
            MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
            Assertions.assertTrue((extractor.extractTimestamp(cdcRecord) > 0L ? 1 : 0) != 0);
            Map data = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord().data();
            String createTime = (String)data.get("etl_create_time");
            String updateTime = (String)data.get("etl_update_time");
            createTime = BinaryStringUtils.toTimestamp((BinaryString)BinaryString.fromString((String)createTime), (int)6).toString();
            updateTime = BinaryStringUtils.toTimestamp((BinaryString)BinaryString.fromString((String)updateTime), (int)6).toString();
            Assertions.assertTrue((boolean)createTime.matches(this.dateTimeRegex));
            Assertions.assertTrue((boolean)updateTime.matches(this.dateTimeRegex));
        }
    }

    @Test
    public void extractUpdateRecord() throws Exception {
        AliyunRecordParser parser = new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
        for (String json : updateList) {
            JsonNode jsonNode = (JsonNode)objMapper.readValue(json, JsonNode.class);
            CdcSourceRecord cdcRecord = new CdcSourceRecord((Object)jsonNode);
            Schema schema = parser.buildSchema(cdcRecord);
            Assertions.assertEquals((Object)schema.primaryKeys(), Arrays.asList("id"));
            List records = parser.extractRecords();
            Assertions.assertEquals((int)records.size(), (int)1);
            CdcRecord result = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord();
            Assertions.assertEquals((Object)result.kind(), (Object)RowKind.UPDATE_AFTER);
            String dbName = parser.getDatabaseName();
            Assertions.assertEquals((Object)dbName, (Object)"bigdata_test");
            String tableName = parser.getTableName();
            Assertions.assertEquals((Object)tableName, (Object)"sync_test_table");
            MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
            Assertions.assertTrue((extractor.extractTimestamp(cdcRecord) > 0L ? 1 : 0) != 0);
            Map data = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord().data();
            String createTime = (String)data.get("etl_create_time");
            String updateTime = (String)data.get("etl_update_time");
            Assertions.assertNotNull((Object)createTime);
            updateTime = BinaryStringUtils.toTimestamp((BinaryString)BinaryString.fromString((String)updateTime), (int)6).toString();
            Assertions.assertTrue((boolean)updateTime.matches(this.dateTimeRegex));
        }
    }

    @Test
    public void extractDeleteRecord() throws Exception {
        AliyunRecordParser parser = new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
        for (String json : deleteList) {
            JsonNode jsonNode = (JsonNode)objMapper.readValue(json, JsonNode.class);
            CdcSourceRecord cdcRecord = new CdcSourceRecord((Object)jsonNode);
            Schema schema = parser.buildSchema(cdcRecord);
            Assertions.assertEquals((Object)schema.primaryKeys(), Arrays.asList("id"));
            List records = parser.extractRecords();
            Assertions.assertEquals((int)records.size(), (int)1);
            CdcRecord result = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord();
            Assertions.assertEquals((Object)result.kind(), (Object)RowKind.DELETE);
            String dbName = parser.getDatabaseName();
            Assertions.assertEquals((Object)dbName, (Object)"bigdata_test");
            String tableName = parser.getTableName();
            Assertions.assertEquals((Object)tableName, (Object)"sync_test_table");
            MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
            Assertions.assertTrue((extractor.extractTimestamp(cdcRecord) > 0L ? 1 : 0) != 0);
            Map data = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord().data();
            String createTime = (String)data.get("etl_create_time");
            String updateTime = (String)data.get("etl_update_time");
            Assertions.assertNotNull((Object)createTime);
            updateTime = BinaryStringUtils.toTimestamp((BinaryString)BinaryString.fromString((String)updateTime), (int)6).toString();
            Assertions.assertTrue((boolean)updateTime.matches(this.dateTimeRegex));
        }
    }
}

