/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.watermark;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.utils.JsonSerdeUtil;

public class MessageQueueCdcTimestampExtractor
implements CdcTimestampExtractor {
    private static final long serialVersionUID = 1L;

    @Override
    public long extractTimestamp(CdcSourceRecord cdcSourceRecord) throws JsonProcessingException {
        JsonNode record = (JsonNode)cdcSourceRecord.getValue();
        if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) {
            return JsonSerdeUtil.extractValue(record, Long.class, "ts");
        }
        if (JsonSerdeUtil.isNodeExists(record, "pos")) {
            String dateTimeString = JsonSerdeUtil.extractValue(record, String.class, "op_ts");
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
            LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
            return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
        }
        if (JsonSerdeUtil.isNodeExists(record, "xid")) {
            return JsonSerdeUtil.extractValue(record, Long.class, "ts") * 1000L;
        }
        if (JsonSerdeUtil.isNodeExists(record, "payload", "source", "connector")) {
            return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
        }
        if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) {
            return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
        }
        if (JsonSerdeUtil.isNodeExists(record, "payload", "timestamp")) {
            return JsonSerdeUtil.extractValue(record, Long.class, "payload", "timestamp", "systemTime");
        }
        throw new RuntimeException(String.format("Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s", record));
    }
}

