/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import io.confluent.kafka.serializers.GenericContainerWithVersion;
import java.io.IOException;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils;
import org.apache.paimon.flink.action.cdc.serialization.ConfluentAvroDeserializationSchema;

public class KafkaDebeziumAvroDeserializationSchema
implements KafkaDeserializationSchema<CdcSourceRecord> {
    private static final long serialVersionUID = 1L;
    private final String topic;
    private final String schemaRegistryUrl;
    private ConfluentAvroDeserializationSchema avroDeserializer;

    public KafkaDebeziumAvroDeserializationSchema(Configuration cdcSourceConfig) {
        this.topic = KafkaActionUtils.findOneTopic(cdcSourceConfig);
        this.schemaRegistryUrl = (String)cdcSourceConfig.get(MessageQueueSchemaUtils.SCHEMA_REGISTRY_URL);
    }

    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        this.initAvroDeserializer();
    }

    public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> message) throws IOException {
        if (message.value() == null) {
            return null;
        }
        if (this.avroDeserializer == null) {
            this.initAvroDeserializer();
        }
        GenericContainerWithVersion keyContainerWithVersion = this.avroDeserializer.deserialize(this.topic, true, (byte[])message.key());
        GenericContainerWithVersion valueContainerWithVersion = this.avroDeserializer.deserialize(this.topic, false, (byte[])message.value());
        GenericRecord key = null;
        if (keyContainerWithVersion != null) {
            key = (GenericRecord)keyContainerWithVersion.container();
        }
        GenericRecord value = (GenericRecord)valueContainerWithVersion.container();
        return new CdcSourceRecord(this.topic, key, value);
    }

    public boolean isEndOfStream(CdcSourceRecord nextElement) {
        return false;
    }

    public TypeInformation<CdcSourceRecord> getProducedType() {
        return TypeExtractor.getForClass(CdcSourceRecord.class);
    }

    private void initAvroDeserializer() {
        this.avroDeserializer = ConfluentAvroDeserializationSchema.create(this.schemaRegistryUrl);
    }
}

