/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.impl.JsonHelper;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
    private final KafkaWriteStream<?, ?> stream;
    private final int partition;
    private final String key;
    private final String topic;
    private final boolean waitForWriteCompletion;
    private final SubscriberBuilder<? extends Message<?>, Void> subscriber;

    public KafkaSink(io.vertx.mutiny.core.Vertx vertx, KafkaConnectorOutgoingConfiguration config) {
        JsonObject kafkaConfiguration = this.extractProducerConfiguration(config);
        this.stream = KafkaWriteStream.create((Vertx)vertx.getDelegate(), (Map)kafkaConfiguration.getMap());
        this.stream.exceptionHandler(t -> LOGGER.error("Unable to write to Kafka", t));
        this.partition = config.getPartition();
        this.key = config.getKey().orElse(null);
        this.topic = config.getTopic().orElseGet(config::getChannel);
        this.waitForWriteCompletion = config.getWaitForWriteCompletion();
        this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(message -> {
            try {
                String actualTopic;
                Optional om = message.getMetadata(OutgoingKafkaRecordMetadata.class);
                OutgoingKafkaRecordMetadata metadata = om.orElse(null);
                String string = actualTopic = metadata == null || metadata.getTopic() == null ? this.topic : metadata.getTopic();
                if (actualTopic == null) {
                    LOGGER.error("Ignoring message - no topic set");
                    return CompletableFuture.completedFuture(message);
                }
                ProducerRecord<?, ?> record = this.getProducerRecord((Message<?>)message, metadata, actualTopic);
                LOGGER.debug("Sending message {} to Kafka topic '{}'", message, (Object)record.topic());
                CompletableFuture future = new CompletableFuture();
                Handler handler = ar -> {
                    if (ar.succeeded()) {
                        LOGGER.debug("Message {} sent successfully to Kafka topic '{}'", message, (Object)record.topic());
                        future.complete(message);
                    } else {
                        LOGGER.error("Message {} was not sent to Kafka topic '{}'", new Object[]{message, record.topic(), ar.cause()});
                        future.completeExceptionally(ar.cause());
                    }
                };
                CompletionStage result = ((CompletableFuture)future.thenCompose(x -> message.ack())).thenApply(x -> message);
                this.stream.write(record, handler);
                if (this.waitForWriteCompletion) {
                    return result;
                }
                return CompletableFuture.completedFuture(message);
            }
            catch (RuntimeException e) {
                LOGGER.error("Unable to send a record to Kafka ", (Throwable)e);
                return CompletableFuture.completedFuture(message);
            }
        }).onError(t -> LOGGER.error("Unable to dispatch message to Kafka", t)).ignore();
    }

    private ProducerRecord<?, ?> getProducerRecord(Message<?> message, OutgoingKafkaRecordMetadata<?> om, String actualTopic) {
        String actualKey;
        int actualPartition = om == null || om.getPartition() <= -1 ? this.partition : om.getPartition();
        String string = actualKey = om == null || om.getKey() == null ? this.key : om.getKey();
        long actualTimestamp = om == null || om.getKey() == null ? -1L : (om.getTimestamp() != null ? om.getTimestamp().toEpochMilli() : -1L);
        Headers kafkaHeaders = om == null || om.getHeaders() == null ? Collections.emptyList() : om.getHeaders();
        return new ProducerRecord(actualTopic, actualPartition == -1 ? null : Integer.valueOf(actualPartition), actualTimestamp == -1L ? null : Long.valueOf(actualTimestamp), (Object)actualKey, message.getPayload(), (Iterable)kafkaHeaders);
    }

    private JsonObject extractProducerConfiguration(KafkaConnectorOutgoingConfiguration config) {
        JsonObject kafkaConfiguration = JsonHelper.asJsonObject(config.config());
        kafkaConfiguration.put("acks", config.getAcks());
        if (!kafkaConfiguration.containsKey("bootstrap.servers")) {
            LOGGER.info("Setting {} to {}", (Object)"bootstrap.servers", (Object)config.getBootstrapServers());
            kafkaConfiguration.put("bootstrap.servers", config.getBootstrapServers());
        }
        if (!kafkaConfiguration.containsKey("key.serializer")) {
            LOGGER.info("Key deserializer omitted, using String as default");
            kafkaConfiguration.put("key.serializer", config.getKeySerializer());
        }
        kafkaConfiguration.remove("channel-name");
        kafkaConfiguration.remove("topic");
        kafkaConfiguration.remove("connector");
        kafkaConfiguration.remove("partition");
        kafkaConfiguration.remove("key");
        return kafkaConfiguration;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.subscriber;
    }

    public void closeQuietly() {
        CountDownLatch latch = new CountDownLatch(1);
        try {
            this.stream.close(ar -> {
                if (ar.failed()) {
                    LOGGER.debug("An error has been caught while closing the Kafka Write Stream", ar.cause());
                }
                latch.countDown();
            });
        }
        catch (Throwable e) {
            LOGGER.debug("An error has been caught while closing the Kafka Write Stream", e);
            latch.countDown();
        }
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

