/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.fault;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSink;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

public class KafkaDeadLetterQueue
implements KafkaFailureHandler {
    public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead-letter-exception-class-name";
    public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead-letter-cause-class-name";
    public static final String DEAD_LETTER_REASON = "dead-letter-reason";
    public static final String DEAD_LETTER_CAUSE = "dead-letter-cause";
    public static final String DEAD_LETTER_TOPIC = "dead-letter-topic";
    public static final String DEAD_LETTER_OFFSET = "dead-letter-offset";
    public static final String DEAD_LETTER_PARTITION = "dead-letter-partition";
    public static final String CHANNEL_DLQ_SUFFIX = "dead-letter-queue";
    private final String channel;
    private final KafkaSink dlqSink;
    private final UnicastProcessor<Message<?>> dlqSource;
    private final String topic;

    public KafkaDeadLetterQueue(String channel, String topic, KafkaSink dlqSink, UnicastProcessor<Message<?>> dlqSource) {
        this.channel = channel;
        this.topic = topic;
        this.dlqSink = dlqSink;
        this.dlqSource = dlqSource;
    }

    private static String getMirrorSerializer(String deserializer) {
        if (deserializer == null) {
            return StringSerializer.class.getName();
        }
        return deserializer.replace("Deserializer", "Serializer");
    }

    private String getThrowableMessage(Throwable throwable) {
        String text = throwable.getMessage();
        if (text == null) {
            text = throwable.toString();
        }
        return text;
    }

    @Override
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
        OutgoingKafkaRecordMetadata outgoing = metadata != null ? (OutgoingKafkaRecordMetadata)metadata.get(OutgoingKafkaRecordMetadata.class).orElse(null) : null;
        String topic = this.topic;
        if (outgoing != null && outgoing.getTopic() != null) {
            topic = outgoing.getTopic();
        }
        Object key = record.getKey();
        if (outgoing != null && outgoing.getKey() != null) {
            key = outgoing.getKey();
        }
        Integer partition = null;
        if (outgoing != null && outgoing.getPartition() >= 0) {
            partition = outgoing.getPartition();
        }
        ProducerRecord dead = new ProducerRecord(topic, partition, key, record.getPayload());
        this.addHeader(dead, DEAD_LETTER_EXCEPTION_CLASS_NAME, reason.getClass().getName());
        this.addHeader(dead, DEAD_LETTER_REASON, this.getThrowableMessage(reason));
        if (reason.getCause() != null) {
            this.addHeader(dead, DEAD_LETTER_CAUSE_CLASS_NAME, reason.getCause().getClass().getName());
            this.addHeader(dead, DEAD_LETTER_CAUSE, this.getThrowableMessage(reason.getCause()));
        }
        this.addHeader(dead, DEAD_LETTER_TOPIC, record.getTopic());
        this.addHeader(dead, DEAD_LETTER_PARTITION, Integer.toString(record.getPartition()));
        this.addHeader(dead, DEAD_LETTER_OFFSET, Long.toString(record.getOffset()));
        record.getHeaders().forEach(header -> dead.headers().add(header));
        if (outgoing != null && outgoing.getHeaders() != null) {
            outgoing.getHeaders().forEach(header -> dead.headers().add(header));
        }
        dead.headers().remove("deserialization-failure-dlq");
        KafkaLogging.log.messageNackedDeadLetter(this.channel, topic);
        CompletableFuture future = new CompletableFuture();
        this.dlqSource.onNext((Object)record.withPayload(dead).withAck(() -> record.ack().thenAccept(__ -> future.complete(null))).withNack(throwable -> {
            future.completeExceptionally((Throwable)throwable);
            return future;
        }));
        return Uni.createFrom().completionStage(future).emitOn(arg_0 -> record.runOnMessageContext(arg_0));
    }

    void addHeader(ProducerRecord<?, ?> record, String key, String value) {
        record.headers().add(key, value.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public void terminate() {
        this.dlqSink.closeQuietly();
    }

    @ApplicationScoped
    @Identifier(value="dead-letter-queue")
    public static class Factory
    implements KafkaFailureHandler.Factory {
        @Inject
        KafkaCDIEvents kafkaCDIEvents;
        @Inject
        @Any
        Instance<SerializationFailureHandler<?>> serializationFailureHandlers;
        @Inject
        @Any
        Instance<ClientCustomizer<Map<String, Object>>> configCustomizers;
        @Inject
        @Any
        Instance<ProducerInterceptor<?, ?>> producerInterceptors;
        @Inject
        Instance<Config> rootConfig;
        @Inject
        @Any
        Instance<Map<String, Object>> configurations;
        @Inject
        Instance<OpenTelemetry> openTelemetryInstance;
        @Inject
        Instance<SubscriberDecorator> subscriberDecorators;

        @Override
        public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, Vertx vertx, KafkaConsumer<?, ?> consumer, BiConsumer<Throwable, Boolean> reportFailure) {
            String keyDeserializer = (String)consumer.configuration().get("key.deserializer");
            String valueDeserializer = (String)consumer.configuration().get("value.deserializer");
            String consumerClientId = (String)consumer.configuration().get("client.id");
            OverrideConnectorConfig connectorConfig = new OverrideConnectorConfig("mp.messaging.incoming.", config.config(), "smallrye-kafka", config.getChannel(), KafkaDeadLetterQueue.CHANNEL_DLQ_SUFFIX, Map.of("key.serializer", c -> KafkaDeadLetterQueue.getMirrorSerializer(keyDeserializer), "value.serializer", c -> KafkaDeadLetterQueue.getMirrorSerializer(valueDeserializer), "client.id", c -> config.getDeadLetterQueueProducerClientId().orElse("kafka-dead-letter-topic-producer-" + consumerClientId), "topic", c -> "dead-letter-topic-" + config.getChannel(), "key-serialization-failure-handler", c -> "dlq-serialization", "value-serialization-failure-handler", c -> "dlq-serialization", "interceptor.classes", c -> ""));
            Config kafkaConfig = ConfigHelper.retrieveChannelConfiguration(this.configurations, (Config)connectorConfig);
            KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(kafkaConfig);
            String deadQueueTopic = config.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + config.getChannel());
            KafkaLogging.log.deadLetterConfig(producerConfig.getTopic().orElse(null), producerConfig.getKeySerializer(), producerConfig.getValueSerializer());
            UnicastProcessor processor = UnicastProcessor.create();
            KafkaSink kafkaSink = new KafkaSink(producerConfig, this.kafkaCDIEvents, this.openTelemetryInstance, this.configCustomizers, this.serializationFailureHandlers, this.producerInterceptors);
            Wiring.wireOutgoingConnectorToUpstream((Multi)processor, kafkaSink.getSink(), this.subscriberDecorators, (String)(producerConfig.getChannel() + "-dead-letter-queue"));
            return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, kafkaSink, processor);
        }
    }
}

