/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.watermark;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.utils.JsonSerdeUtil;

public class CdcTimestampExtractorFactory
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Map<Class<?>, Supplier<CdcTimestampExtractor>> extractorMap = new HashMap();

    public static CdcTimestampExtractor createExtractor(Object source) {
        Supplier<CdcTimestampExtractor> extractorSupplier = extractorMap.get(source.getClass());
        if (extractorSupplier != null) {
            return extractorSupplier.get();
        }
        throw new IllegalArgumentException("Unsupported source type: " + source.getClass().getName());
    }

    static {
        extractorMap.put(MongoDBSource.class, MongoDBCdcTimestampExtractor::new);
        extractorMap.put(MySqlSource.class, MysqlCdcTimestampExtractor::new);
        extractorMap.put(PulsarSource.class, MessageQueueCdcTimestampExtractor::new);
        extractorMap.put(KafkaSource.class, MessageQueueCdcTimestampExtractor::new);
    }

    public static interface CdcTimestampExtractor
    extends Serializable {
        public long extractTimestamp(CdcSourceRecord var1) throws JsonProcessingException;
    }

    public static abstract class CdcDebeziumTimestampExtractor
    implements CdcTimestampExtractor {
        protected final ObjectMapper objectMapper = new ObjectMapper();

        public CdcDebeziumTimestampExtractor() {
            this.objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        }
    }

    public static class MysqlCdcTimestampExtractor
    extends CdcDebeziumTimestampExtractor {
        @Override
        public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException {
            JsonNode json = JsonSerdeUtil.fromJson((String)record.getValue(), JsonNode.class);
            return JsonSerdeUtil.extractValueOrDefault(json, Long.class, Long.MIN_VALUE, "payload", "ts_ms");
        }
    }

    public static class MessageQueueCdcTimestampExtractor
    implements CdcTimestampExtractor {
        private static final long serialVersionUID = 1L;

        @Override
        public long extractTimestamp(CdcSourceRecord cdcSourceRecord) throws JsonProcessingException {
            JsonNode record = (JsonNode)cdcSourceRecord.getValue();
            if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) {
                return JsonSerdeUtil.extractValue(record, Long.class, "ts");
            }
            if (JsonSerdeUtil.isNodeExists(record, "pos")) {
                String dateTimeString = JsonSerdeUtil.extractValue(record, String.class, "op_ts");
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
                LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
                return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
            }
            if (JsonSerdeUtil.isNodeExists(record, "xid")) {
                return JsonSerdeUtil.extractValue(record, Long.class, "ts") * 1000L;
            }
            if (JsonSerdeUtil.isNodeExists(record, "payload", "source", "connector")) {
                return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
            }
            if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) {
                return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
            }
            throw new RuntimeException(String.format("Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s", record));
        }
    }

    public static class MongoDBCdcTimestampExtractor
    extends CdcDebeziumTimestampExtractor {
        private static final long serialVersionUID = 1L;

        @Override
        public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException {
            JsonNode json = JsonSerdeUtil.fromJson((String)record.getValue(), JsonNode.class);
            return JsonSerdeUtil.extractValueOrDefault(json, Long.class, Long.MIN_VALUE, "ts_ms");
        }
    }
}

