/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka;

import io.smallrye.reactive.messaging.kafka.JsonHelper;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
    private final KafkaWriteStream stream;
    private final int partition;
    private final String key;
    private final String topic;
    private final boolean waitForWriteCompletion;
    private final SubscriberBuilder<? extends Message, Void> subscriber;

    KafkaSink(io.vertx.reactivex.core.Vertx vertx, Config config, String servers) {
        JsonObject kafkaConfiguration = JsonHelper.asJsonObject(config);
        if (kafkaConfiguration.containsKey("acks")) {
            kafkaConfiguration.put("acks", kafkaConfiguration.getValue("acks").toString());
        }
        if (!kafkaConfiguration.containsKey("bootstrap.servers")) {
            LOGGER.info("Setting {} to {}", (Object)"bootstrap.servers", (Object)servers);
            kafkaConfiguration.put("bootstrap.servers", servers);
        }
        if (!kafkaConfiguration.containsKey("key.serializer")) {
            LOGGER.info("Key deserializer omitted, using String as default");
            kafkaConfiguration.put("key.serializer", StringSerializer.class.getName());
        }
        this.stream = KafkaWriteStream.create((Vertx)vertx.getDelegate(), (Map)kafkaConfiguration.getMap());
        this.stream.exceptionHandler(t -> LOGGER.error("Unable to write to Kafka", t));
        this.partition = config.getOptionalValue("partition", Integer.class).orElse(-1);
        this.key = config.getOptionalValue("key", String.class).orElse(null);
        this.topic = this.getTopicOrNull(config);
        this.waitForWriteCompletion = config.getOptionalValue("waitForWriteCompletion", Boolean.class).orElse(true);
        if (this.topic == null) {
            LOGGER.warn("No default topic configured, only sending messages with an explicit topic set");
        }
        this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(message -> {
            try {
                ProducerRecord record;
                if (message instanceof KafkaMessage) {
                    KafkaMessage km = (KafkaMessage)message;
                    if (this.topic == null && km.getTopic() == null) {
                        LOGGER.error("Ignoring message - no topic set");
                        return CompletableFuture.completedFuture(message);
                    }
                    Integer actualPartition = null;
                    if (this.partition != -1) {
                        actualPartition = this.partition;
                    }
                    if (km.getPartition() != null) {
                        actualPartition = km.getPartition();
                    }
                    String actualTopicToBeUSed = this.topic;
                    if (km.getTopic() != null) {
                        actualTopicToBeUSed = km.getTopic();
                    }
                    if (actualTopicToBeUSed == null) {
                        LOGGER.error("Ignoring message - no topic set");
                        return CompletableFuture.completedFuture(message);
                    }
                    record = new ProducerRecord(actualTopicToBeUSed, actualPartition, km.getTimestamp(), (Object)(km.getKey() == null ? this.key : km.getKey()), km.getPayload(), km.getHeaders().unwrap());
                    LOGGER.info("Sending message {} to Kafka topic '{}'", message, (Object)record.topic());
                } else {
                    if (this.topic == null) {
                        LOGGER.error("Ignoring message - no topic set");
                        return CompletableFuture.completedFuture(message);
                    }
                    record = this.partition == -1 ? new ProducerRecord(this.topic, null, null, (Object)this.key, message.getPayload()) : new ProducerRecord(this.topic, Integer.valueOf(this.partition), null, (Object)this.key, message.getPayload());
                }
                CompletableFuture future = new CompletableFuture();
                Handler handler = ar -> {
                    if (ar.succeeded()) {
                        LOGGER.info("Message {} sent successfully to Kafka topic '{}'", message, (Object)record.topic());
                        future.complete(message);
                    } else {
                        LOGGER.error("Message {} was not sent to Kafka topic '{}'", new Object[]{message, record.topic(), ar.cause()});
                        future.completeExceptionally(ar.cause());
                    }
                };
                CompletionStage result = ((CompletableFuture)future.thenCompose(x -> message.ack())).thenApply(x -> message);
                this.stream.write((Object)record, handler);
                if (this.waitForWriteCompletion) {
                    return result;
                }
                return CompletableFuture.completedFuture(message);
            }
            catch (RuntimeException e) {
                LOGGER.error("Unable to send a record to Kafka ", (Throwable)e);
                return CompletableFuture.completedFuture(message);
            }
        }).onError(t -> LOGGER.error("Unable to dispatch message to Kafka", t)).ignore();
    }

    SubscriberBuilder<? extends Message, Void> getSink() {
        return this.subscriber;
    }

    void close() {
        this.stream.close();
    }

    private String getTopicOrNull(Config config) {
        return config.getOptionalValue("topic", String.class).orElseGet(() -> config.getOptionalValue("channel-name", String.class).orElse(null));
    }
}

