/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.fault;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Metadata;

public class KafkaDeadLetterQueue
implements KafkaFailureHandler {
    public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead-letter-exception-class-name";
    public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead-letter-cause-class-name";
    public static final String DEAD_LETTER_REASON = "dead-letter-reason";
    public static final String DEAD_LETTER_CAUSE = "dead-letter-cause";
    public static final String DEAD_LETTER_TOPIC = "dead-letter-topic";
    public static final String DEAD_LETTER_OFFSET = "dead-letter-offset";
    public static final String DEAD_LETTER_PARTITION = "dead-letter-partition";
    private final String channel;
    private final KafkaProducer producer;
    private final String topic;
    private final BiConsumer<Throwable, Boolean> reportFailure;

    public KafkaDeadLetterQueue(String channel, String topic, KafkaProducer producer, BiConsumer<Throwable, Boolean> reportFailure) {
        this.channel = channel;
        this.topic = topic;
        this.producer = producer;
        this.reportFailure = reportFailure;
    }

    private static String getMirrorSerializer(String deserializer) {
        if (deserializer == null) {
            return StringSerializer.class.getName();
        }
        return deserializer.replace("Deserializer", "Serializer");
    }

    private String getThrowableMessage(Throwable throwable) {
        String text = throwable.getMessage();
        if (text == null) {
            text = throwable.toString();
        }
        return text;
    }

    @Override
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
        OutgoingKafkaRecordMetadata outgoing = metadata != null ? (OutgoingKafkaRecordMetadata)metadata.get(OutgoingKafkaRecordMetadata.class).orElse(null) : null;
        String topic = this.topic;
        if (outgoing != null && outgoing.getTopic() != null) {
            topic = outgoing.getTopic();
        }
        Object key = record.getKey();
        if (outgoing != null && outgoing.getKey() != null) {
            key = outgoing.getKey();
        }
        Integer partition = null;
        if (outgoing != null && outgoing.getPartition() >= 0) {
            partition = outgoing.getPartition();
        }
        ProducerRecord dead = new ProducerRecord(topic, partition, key, record.getPayload());
        this.addHeader(dead, DEAD_LETTER_EXCEPTION_CLASS_NAME, reason.getClass().getName());
        this.addHeader(dead, DEAD_LETTER_REASON, this.getThrowableMessage(reason));
        if (reason.getCause() != null) {
            this.addHeader(dead, DEAD_LETTER_CAUSE_CLASS_NAME, reason.getCause().getClass().getName());
            this.addHeader(dead, DEAD_LETTER_CAUSE, this.getThrowableMessage(reason.getCause()));
        }
        this.addHeader(dead, DEAD_LETTER_TOPIC, record.getTopic());
        this.addHeader(dead, DEAD_LETTER_PARTITION, Integer.toString(record.getPartition()));
        this.addHeader(dead, DEAD_LETTER_OFFSET, Long.toString(record.getOffset()));
        record.getHeaders().forEach(header -> dead.headers().add(header));
        if (outgoing != null && outgoing.getHeaders() != null) {
            outgoing.getHeaders().forEach(header -> dead.headers().add(header));
        }
        dead.headers().remove("deserialization-failure-dlq");
        KafkaLogging.log.messageNackedDeadLetter(this.channel, topic);
        return this.producer.send(dead).onFailure().invoke(t -> this.reportFailure.accept((Throwable)t, true)).onItem().ignore().andContinueWithNull().chain(() -> Uni.createFrom().completionStage(record.ack())).emitOn(arg_0 -> record.runOnMessageContext(arg_0));
    }

    void addHeader(ProducerRecord<?, ?> record, String key, String value) {
        record.headers().add(key, value.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public void terminate() {
        this.producer.close();
    }

    @ApplicationScoped
    @Identifier(value="dead-letter-queue")
    public static class Factory
    implements KafkaFailureHandler.Factory {
        @Inject
        KafkaCDIEvents kafkaCDIEvents;
        @Inject
        @Any
        Instance<SerializationFailureHandler<?>> serializationFailureHandlers;
        @Inject
        @Any
        Instance<ProducerInterceptor<?, ?>> producerInterceptors;
        @Inject
        Instance<Config> rootConfig;

        @Override
        public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, Vertx vertx, KafkaConsumer<?, ?> consumer, BiConsumer<Throwable, Boolean> reportFailure) {
            HashMap deadQueueProducerConfig = new HashMap(consumer.configuration());
            String keyDeserializer = (String)deadQueueProducerConfig.remove("key.deserializer");
            String valueDeserializer = (String)deadQueueProducerConfig.remove("value.deserializer");
            String consumerClientId = (String)consumer.configuration().get("client.id");
            OverrideConnectorConfig connectorConfig = new OverrideConnectorConfig("mp.messaging.incoming.", (Config)this.rootConfig.get(), config.getChannel(), "dead-letter-queue", Map.of("key.serializer", c -> KafkaDeadLetterQueue.getMirrorSerializer(keyDeserializer), "value.serializer", c -> KafkaDeadLetterQueue.getMirrorSerializer(valueDeserializer), "client.id", c -> config.getDeadLetterQueueProducerClientId().orElse("kafka-dead-letter-topic-producer-" + consumerClientId), "topic", c -> "dead-letter-topic-" + config.getChannel(), "key-serialization-failure-handler", c -> "dlq-serialization", "value-serialization-failure-handler", c -> "dlq-serialization", "interceptor.classes", c -> ""));
            KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration((Config)connectorConfig);
            String deadQueueTopic = config.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + config.getChannel());
            KafkaLogging.log.deadLetterConfig(producerConfig.getTopic().orElse(null), producerConfig.getKeySerializer(), producerConfig.getValueSerializer());
            ReactiveKafkaProducer producer = new ReactiveKafkaProducer(producerConfig, this.serializationFailureHandlers, this.producerInterceptors, null, (p, c) -> this.kafkaCDIEvents.producer().fire(p));
            return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure);
        }
    }
}

