/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarKafkaProducer<K, V>
implements Producer<K, V> {
    private final PulsarClient client;
    private final ProducerBuilder<byte[]> pulsarProducerBuilder;
    private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<String, org.apache.pulsar.client.api.Producer<byte[]>>();
    private final Schema<K> keySchema;
    private final Schema<V> valueSchema;
    private final Partitioner partitioner;
    private volatile Cluster cluster = Cluster.empty();
    private List<ProducerInterceptor<K, V>> interceptors;
    private final Properties properties;
    private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);

    public PulsarKafkaProducer(Map<String, Object> configs) {
        this(new ProducerConfig(configs), null, null);
    }

    public PulsarKafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(new ProducerConfig(configs), new PulsarKafkaSchema<K>(keySerializer), new PulsarKafkaSchema<V>(valueSerializer));
    }

    public PulsarKafkaProducer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
        this(new ProducerConfig(configs), keySchema, valueSchema);
    }

    public PulsarKafkaProducer(Properties properties) {
        this(new ProducerConfig(properties), null, null);
    }

    public PulsarKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(new ProducerConfig(properties), new PulsarKafkaSchema<K>(keySerializer), new PulsarKafkaSchema<V>(valueSerializer));
    }

    public PulsarKafkaProducer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
        this(new ProducerConfig(properties), keySchema, valueSchema);
    }

    private PulsarKafkaProducer(ProducerConfig producerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
        if (keySchema == null) {
            Serializer kafkaKeySerializer = (Serializer)producerConfig.getConfiguredInstance("key.serializer", Serializer.class);
            kafkaKeySerializer.configure(producerConfig.originals(), true);
            this.keySchema = new PulsarKafkaSchema<K>(kafkaKeySerializer);
        } else {
            this.keySchema = keySchema;
            producerConfig.ignore("key.serializer");
        }
        if (valueSchema == null) {
            Serializer kafkaValueSerializer = (Serializer)producerConfig.getConfiguredInstance("value.serializer", Serializer.class);
            kafkaValueSerializer.configure(producerConfig.originals(), false);
            this.valueSchema = new PulsarKafkaSchema<V>(kafkaValueSerializer);
        } else {
            this.valueSchema = valueSchema;
            producerConfig.ignore("value.serializer");
        }
        this.partitioner = (Partitioner)producerConfig.getConfiguredInstance("partitioner.class", Partitioner.class);
        this.partitioner.configure(producerConfig.originals());
        this.properties = new Properties();
        producerConfig.originals().forEach((k, v) -> this.properties.put(k, v));
        long keepAliveIntervalMs = Long.parseLong(this.properties.getProperty("connections.max.idle.ms", "30000"));
        String serviceUrl = (String)producerConfig.getList("bootstrap.servers").get(0);
        try {
            int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000L);
            this.client = PulsarClientKafkaConfig.getClientBuilder(this.properties).serviceUrl(serviceUrl).keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS).build();
        }
        catch (ArithmeticException e) {
            String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
            logger.error(errorMessage);
            throw new IllegalArgumentException(errorMessage);
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
        this.pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(this.client, this.properties);
        long lingerMs = Long.parseLong(this.properties.getProperty("linger.ms", "1"));
        this.pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
        String compressionType = this.properties.getProperty("compression.type");
        if ("gzip".equals(compressionType)) {
            this.pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
        } else if ("lz4".equals(compressionType)) {
            this.pulsarProducerBuilder.compressionType(CompressionType.LZ4);
        }
        this.pulsarProducerBuilder.messageRouter((MessageRouter)new KafkaMessageRouter(lingerMs));
        int sendTimeoutMillis = Integer.parseInt(this.properties.getProperty("request.timeout.ms", "30000"));
        this.pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
        Boolean sendTimeOutConfigured = sendTimeoutMillis > 0;
        boolean shouldBlockPulsarProducer = Boolean.getBoolean(this.properties.getProperty("pulsar.block.if.producer.queue.full", sendTimeOutConfigured.toString()));
        this.pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
        this.interceptors = producerConfig.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class);
    }

    public void initTransactions() {
        throw new UnsupportedOperationException();
    }

    public void beginTransaction() throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String s) throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void commitTransaction() throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void abortTransaction() throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        org.apache.pulsar.client.api.Producer producer;
        try {
            producer = this.producers.computeIfAbsent(record.topic(), topic -> this.createNewProducer((String)topic));
        }
        catch (Exception e) {
            if (callback != null) {
                callback.onCompletion(null, e);
            }
            CompletableFuture<RecordMetadata> future = new CompletableFuture<RecordMetadata>();
            future.completeExceptionally(e);
            return future;
        }
        TypedMessageBuilder messageBuilder = producer.newMessage();
        int messageSize = this.buildMessage((TypedMessageBuilder<byte[]>)messageBuilder, record);
        CompletableFuture<RecordMetadata> future = new CompletableFuture<RecordMetadata>();
        ((CompletableFuture)messageBuilder.sendAsync().thenAccept(messageId -> future.complete(this.getRecordMetadata(record.topic(), (TypedMessageBuilder<byte[]>)messageBuilder, (MessageId)messageId, messageSize)))).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        future.handle((recordMetadata, throwable) -> {
            if (callback != null) {
                Exception exception = throwable != null ? new Exception((Throwable)throwable) : null;
                callback.onCompletion(recordMetadata, exception);
            }
            return null;
        });
        return future;
    }

    public void flush() {
        this.producers.values().stream().map(p -> p.flushAsync()).collect(Collectors.toList()).forEach(CompletableFuture::join);
    }

    public List<PartitionInfo> partitionsFor(String topic) {
        throw new UnsupportedOperationException();
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.emptyMap();
    }

    public void close() {
        this.close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        this.partitioner.close();
    }

    public void close(long timeout, TimeUnit unit) {
        try {
            this.client.closeAsync().get(timeout, unit);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public void close(Duration duration) {
        this.close(duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
        try {
            PulsarKafkaProducer pulsarKafkaProducer = this;
            synchronized (pulsarKafkaProducer) {
                this.cluster = this.cluster.withPartitions(this.readPartitionsInfo(topic));
            }
            List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = this.interceptors.stream().map(interceptor -> new KafkaProducerInterceptorWrapper<K, V>(interceptor, this.keySchema, this.valueSchema, topic)).collect(Collectors.toList());
            return this.pulsarProducerBuilder.clone().topic(topic).intercept(wrappedInterceptors.toArray(new org.apache.pulsar.client.api.ProducerInterceptor[wrappedInterceptors.size()])).create();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    private Map<TopicPartition, PartitionInfo> readPartitionsInfo(String topic) {
        List partitions = (List)this.client.getPartitionsForTopic(topic).join();
        HashMap<TopicPartition, PartitionInfo> partitionsInfo = new HashMap<TopicPartition, PartitionInfo>();
        for (int i = 0; i < partitions.size(); ++i) {
            TopicPartition tp = new TopicPartition(topic, i);
            PartitionInfo pi = new PartitionInfo(topic, i, null, null, null);
            partitionsInfo.put(tp, pi);
        }
        return partitionsInfo;
    }

    private int buildMessage(TypedMessageBuilder<byte[]> builder, ProducerRecord<K, V> record) {
        byte[] keyBytes = null;
        if (record.key() != null) {
            String key = this.getKey(record.topic(), record.key());
            keyBytes = key.getBytes(StandardCharsets.UTF_8);
            builder.key(key);
        }
        if (record.timestamp() != null) {
            builder.eventTime(record.timestamp().longValue());
        }
        if (this.valueSchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema)this.valueSchema).setTopic(record.topic());
        }
        byte[] value = this.valueSchema.encode(record.value());
        builder.value((Object)value);
        if (record.partition() != null) {
            builder.property("pulsar.partition.id", record.partition().toString());
        } else {
            int partition = this.partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, this.cluster);
            builder.property("pulsar.partition.id", Integer.toString(partition));
        }
        return value.length;
    }

    private String getKey(String topic, K key) {
        if (key instanceof String) {
            return (String)key;
        }
        if (this.keySchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema)this.keySchema).setTopic(topic);
        }
        byte[] keyBytes = this.keySchema.encode(key);
        return Base64.getEncoder().encodeToString(keyBytes);
    }

    private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId, int size) {
        MessageIdImpl msgId = (MessageIdImpl)messageId;
        long offset = MessageIdUtils.getOffset((MessageId)msgId);
        int partition = msgId.getPartitionIndex();
        TopicPartition tp = new TopicPartition(topic, partition);
        TypedMessageBuilderImpl mb = (TypedMessageBuilderImpl)msgBuilder;
        long publishTime = 0L;
        try {
            publishTime = mb.getPublishTime();
        }
        catch (IllegalStateException ise) {
            logger.debug("could not get publish time");
        }
        return new RecordMetadata(tp, offset, 0L, publishTime, Long.valueOf(0L), mb.hasKey() ? mb.getKey().length() : 0, size);
    }
}

