/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.kafka.compat;

import java.util.Base64;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducerInterceptorWrapper<K, V>
implements org.apache.pulsar.client.api.ProducerInterceptor<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducerInterceptorWrapper.class);
    private final ProducerInterceptor<K, V> kafkaProducerInterceptor;
    private final Schema<K> keySchema;
    private final Schema<V> valueSchema;
    private final String topic;
    private Schema<byte[]> scheme;
    private long eventTime;
    private String partitionID;

    public KafkaProducerInterceptorWrapper(ProducerInterceptor<K, V> kafkaProducerInterceptor, Schema<K> keySchema, Schema<V> valueSchema, String topic) {
        this.kafkaProducerInterceptor = kafkaProducerInterceptor;
        this.keySchema = keySchema;
        this.valueSchema = valueSchema;
        this.topic = topic;
    }

    public void close() {
        this.kafkaProducerInterceptor.close();
    }

    public Message<byte[]> beforeSend(Producer<byte[]> producer, Message<byte[]> message) {
        return this.toPulsarMessage(this.kafkaProducerInterceptor.onSend(this.toKafkaRecord(message)));
    }

    public void onSendAcknowledgement(Producer<byte[]> producer, Message<byte[]> message, MessageId msgId, Throwable exception) {
        try {
            MessageMetadata messageMetadataBuilder = ((MessageImpl)message).getMessageBuilder();
            this.partitionID = this.getPartitionID(messageMetadataBuilder);
            TopicPartition topicPartition = new TopicPartition(this.topic, Integer.parseInt(this.partitionID));
            this.kafkaProducerInterceptor.onAcknowledgement(new RecordMetadata(topicPartition, -1L, -1L, messageMetadataBuilder.getEventTime(), Long.valueOf(-1L), message.getKeyBytes().length, ((byte[])message.getValue()).length), new Exception(exception));
        }
        catch (NumberFormatException e) {
            String errorMessage = "Unable to convert partitionID to integer: " + e.getMessage();
            log.error(errorMessage);
            throw new RuntimeException(errorMessage);
        }
    }

    private Message<byte[]> toPulsarMessage(ProducerRecord<K, V> producerRecord) {
        TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, this.scheme);
        typedMessageBuilder.key(this.serializeKey(this.topic, producerRecord.key()));
        if (this.valueSchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema)this.valueSchema).setTopic(this.topic);
        }
        typedMessageBuilder.value((Object)this.valueSchema.encode(producerRecord.value()));
        typedMessageBuilder.eventTime(this.eventTime);
        typedMessageBuilder.property("pulsar.partition.id", this.partitionID);
        return typedMessageBuilder.getMessage();
    }

    private ProducerRecord<K, V> toKafkaRecord(Message<byte[]> message) {
        Object value;
        if (this.valueSchema instanceof PulsarKafkaSchema) {
            PulsarKafkaSchema pulsarKeyKafkaSchema = (PulsarKafkaSchema)this.valueSchema;
            Deserializer valueDeserializer = KafkaProducerInterceptorWrapper.getDeserializer(pulsarKeyKafkaSchema.getKafkaSerializer());
            value = valueDeserializer.deserialize(this.topic, (byte[])message.getValue());
        } else {
            value = this.valueSchema.decode((byte[])message.getValue());
        }
        try {
            this.scheme = (Schema)FieldUtils.readField(message, (String)"schema", (boolean)true);
            MessageMetadata messageMetadataBuilder = ((MessageImpl)message).getMessageBuilder();
            this.partitionID = this.getPartitionID(messageMetadataBuilder);
            this.eventTime = message.getEventTime();
            return new ProducerRecord(this.topic, Integer.valueOf(Integer.parseInt(this.partitionID)), Long.valueOf(this.eventTime), this.deserializeKey(this.topic, message.getKey()), value);
        }
        catch (NumberFormatException e) {
            return new ProducerRecord(this.topic, this.deserializeKey(this.topic, message.getKey()), value);
        }
        catch (IllegalAccessException e) {
            String errorMessage = "Unable to get the schema of message due to " + e.getMessage();
            log.error(errorMessage);
            throw new RuntimeException(errorMessage);
        }
    }

    private String serializeKey(String topic, K key) {
        if (key instanceof String) {
            return (String)key;
        }
        if (this.keySchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema)this.keySchema).setTopic(topic);
        }
        byte[] keyBytes = this.keySchema.encode(key);
        return Base64.getEncoder().encodeToString(keyBytes);
    }

    private K deserializeKey(String topic, String key) {
        if (this.keySchema instanceof PulsarKafkaSchema) {
            PulsarKafkaSchema pulsarKeyKafkaSchema = (PulsarKafkaSchema)this.keySchema;
            if (pulsarKeyKafkaSchema.getKafkaSerializer() instanceof StringSerializer) {
                return (K)key;
            }
            Deserializer keyDeserializer = KafkaProducerInterceptorWrapper.getDeserializer(pulsarKeyKafkaSchema.getKafkaSerializer());
            return (K)keyDeserializer.deserialize(topic, Base64.getDecoder().decode(key));
        }
        return (K)this.keySchema.decode(Base64.getDecoder().decode(key));
    }

    private String getPartitionID(MessageMetadata messageMetadataBuilder) {
        return messageMetadataBuilder.getPropertiesList().stream().filter(keyValue -> keyValue.getKey().equals("pulsar.partition.id")).findFirst().get().getValue();
    }

    static Deserializer getDeserializer(Serializer serializer) {
        if (serializer instanceof StringSerializer) {
            return new StringDeserializer();
        }
        if (serializer instanceof LongSerializer) {
            return new LongDeserializer();
        }
        if (serializer instanceof IntegerSerializer) {
            return new IntegerDeserializer();
        }
        if (serializer instanceof DoubleSerializer) {
            return new DoubleDeserializer();
        }
        if (serializer instanceof BytesSerializer) {
            return new BytesDeserializer();
        }
        if (serializer instanceof ByteBufferSerializer) {
            return new ByteBufferDeserializer();
        }
        if (serializer instanceof ByteArraySerializer) {
            return new ByteArrayDeserializer();
        }
        throw new IllegalArgumentException(serializer.getClass().getName() + " is not a valid or supported subclass of org.apache.kafka.common.serialization.Serializer.");
    }
}

