/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.health.KafkaSinkHealth;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.KafkaRecordHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSenderProcessor;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
import io.smallrye.reactive.messaging.kafka.impl.RuntimeKafkaSinkConfiguration;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
import io.smallrye.reactive.messaging.kafka.tracing.KafkaOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.kafka.tracing.KafkaTrace;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import jakarta.enterprise.inject.Instance;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Message;

public class KafkaSink {
    private final ReactiveKafkaProducer<?, ?> client;
    private final int partition;
    private final String topic;
    private final String key;
    private final Flow.Subscriber<? extends Message<?>> subscriber;
    private final long retries;
    private final int deliveryTimeoutMs;
    private final List<Throwable> failures = new ArrayList<Throwable>();
    private final KafkaSenderProcessor processor;
    private final boolean writeAsBinaryCloudEvent;
    private final boolean writeCloudEvents;
    private final boolean mandatoryCloudEventAttributeSet;
    private final boolean isTracingEnabled;
    private final KafkaSinkHealth health;
    private final boolean isHealthEnabled;
    private final boolean isHealthReadinessEnabled;
    private final String channel;
    private final RuntimeKafkaSinkConfiguration runtimeConfiguration;
    private final KafkaOpenTelemetryInstrumenter kafkaInstrumenter;
    private static final Set<Class<? extends Throwable>> NOT_RECOVERABLE = new HashSet<Class>(Arrays.asList(InvalidTopicException.class, OffsetMetadataTooLarge.class, RecordBatchTooLargeException.class, RecordTooLargeException.class, UnknownServerException.class, SerializationException.class, TransactionAbortedException.class));

    public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafkaCDIEvents, Instance<SerializationFailureHandler<?>> serializationFailureHandlers, Instance<ProducerInterceptor<?, ?>> producerInterceptors) {
        this.isTracingEnabled = config.getTracingEnabled();
        this.partition = config.getPartition();
        this.retries = config.getRetries();
        this.topic = config.getTopic().orElseGet(config::getChannel);
        this.key = config.getKey().orElse(null);
        this.channel = config.getChannel();
        this.client = new ReactiveKafkaProducer(config, serializationFailureHandlers, producerInterceptors, this::reportFailure, (p, c) -> {
            KafkaLogging.log.connectedToKafka(KafkaSink.getClientId(c), config.getBootstrapServers(), this.topic);
            kafkaCDIEvents.producer().fire(p);
        });
        this.writeCloudEvents = config.getCloudEvents();
        this.writeAsBinaryCloudEvent = config.getCloudEventsMode().equalsIgnoreCase("binary");
        boolean waitForWriteCompletion = config.getWaitForWriteCompletion();
        this.mandatoryCloudEventAttributeSet = config.getCloudEventsType().isPresent() && config.getCloudEventsSource().isPresent();
        this.deliveryTimeoutMs = KafkaSink.getDeliveryTimeoutMs(this.client.configuration());
        this.runtimeConfiguration = RuntimeKafkaSinkConfiguration.buildFromConfiguration(config);
        if (config.getCloudEvents().booleanValue() && config.getCloudEventsMode().equalsIgnoreCase("structured") && !config.getValueSerializer().equalsIgnoreCase(StringSerializer.class.getName())) {
            KafkaLogging.log.invalidValueSerializerForStructuredCloudEvent(config.getValueSerializer());
            throw new IllegalStateException("Invalid value serializer to write a structured Cloud Event. " + StringSerializer.class.getName() + " must be used, found: " + config.getValueSerializer());
        }
        this.isHealthEnabled = config.getHealthEnabled();
        this.isHealthReadinessEnabled = config.getHealthReadinessEnabled();
        this.health = this.isHealthEnabled ? new KafkaSinkHealth(config, this.client.configuration(), this.client) : null;
        long requests = config.getMaxInflightMessages();
        if (requests <= 0L) {
            requests = Long.MAX_VALUE;
        }
        this.processor = new KafkaSenderProcessor(requests, waitForWriteCompletion, this.writeMessageToKafka());
        this.subscriber = MultiUtils.via((Flow.Processor)this.processor, m -> m.onFailure().invoke(f -> {
            KafkaLogging.log.unableToDispatch((Throwable)f);
            this.reportFailure((Throwable)f);
        }));
        this.kafkaInstrumenter = this.isTracingEnabled ? KafkaOpenTelemetryInstrumenter.createForSink() : null;
    }

    private static String getClientId(Map<String, Object> config) {
        return (String)config.get("client.id");
    }

    private static int getDeliveryTimeoutMs(Map<String, ?> config) {
        int defaultDeliveryTimeoutMs = (Integer)ProducerConfig.configDef().defaultValues().get("delivery.timeout.ms");
        String deliveryTimeoutString = (String)config.get("delivery.timeout.ms");
        return deliveryTimeoutString != null ? Integer.parseInt(deliveryTimeoutString) : defaultDeliveryTimeoutMs;
    }

    private synchronized void reportFailure(Throwable failure) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(failure);
    }

    private Function<Message<?>, Uni<Void>> writeMessageToKafka() {
        return message -> {
            try {
                Optional<io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<?>> om = this.getOutgoingKafkaRecordMetadata((Message<?>)message);
                io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata outgoingMetadata = om.orElse(null);
                String actualTopic = outgoingMetadata == null || outgoingMetadata.getTopic() == null ? this.topic : outgoingMetadata.getTopic();
                OutgoingCloudEventMetadata ceMetadata = message.getMetadata(OutgoingCloudEventMetadata.class).orElse(null);
                io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata incomingMetadata = this.getIncomingKafkaRecordMetadata((Message<?>)message).orElse(null);
                ProducerRecord<?, ?> record = message.getPayload() instanceof ProducerRecord ? (ProducerRecord<?, ?>)message.getPayload() : (this.writeCloudEvents && (ceMetadata != null || this.mandatoryCloudEventAttributeSet) ? (this.writeAsBinaryCloudEvent ? KafkaCloudEventHelper.createBinaryRecord(message, actualTopic, outgoingMetadata, incomingMetadata, ceMetadata, this.runtimeConfiguration) : KafkaCloudEventHelper.createStructuredRecord(message, actualTopic, outgoingMetadata, incomingMetadata, ceMetadata, this.runtimeConfiguration)) : this.getProducerRecord((Message<?>)message, (io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<?>)outgoingMetadata, (io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<?, ?>)incomingMetadata, actualTopic));
                if (this.isTracingEnabled) {
                    KafkaTrace kafkaTrace = new KafkaTrace.Builder().withPartition(record.partition() != null ? record.partition() : -1).withTopic(record.topic()).withHeaders(record.headers()).withGroupId(this.client.get("group.id")).withClientId(this.client.get("client.id")).build();
                    this.kafkaInstrumenter.traceOutgoing((Message<?>)message, kafkaTrace);
                }
                KafkaLogging.log.sendingMessageToTopic((Message<?>)message, actualTopic);
                Uni<RecordMetadata> sendUni = this.client.send(record);
                Uni uni = sendUni.onItem().transformToUni(recordMetadata -> {
                    OutgoingMessageMetadata.setResultOnMessage((Message)message, (Object)recordMetadata);
                    KafkaLogging.log.successfullyToTopic((Message<?>)message, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                    return Uni.createFrom().completionStage(message.ack());
                });
                if (this.retries == Integer.MAX_VALUE) {
                    uni = uni.onFailure(this::isRecoverable).retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(20L)).expireIn((long)this.deliveryTimeoutMs);
                } else if (this.retries > 0L) {
                    uni = uni.onFailure(this::isRecoverable).retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(20L)).atMost(this.retries);
                }
                return uni.onFailure().recoverWithUni(t -> {
                    KafkaLogging.log.nackingMessage((Message<?>)message, actualTopic, (Throwable)t);
                    return Uni.createFrom().completionStage(message.nack(t));
                });
            }
            catch (RuntimeException e) {
                KafkaLogging.log.unableToSendRecord(e);
                return Uni.createFrom().failure((Throwable)e);
            }
        };
    }

    private boolean isRecoverable(Throwable f) {
        return !NOT_RECOVERABLE.contains(f.getClass());
    }

    private Optional<io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<?>> getOutgoingKafkaRecordMetadata(Message<?> message) {
        Optional<io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<?>> metadata = message.getMetadata(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.class).map(x -> x);
        if (metadata.isPresent()) {
            return metadata;
        }
        metadata = message.getMetadata(OutgoingKafkaRecordMetadata.class).map(x -> x);
        return metadata;
    }

    private Optional<io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<?, ?>> getIncomingKafkaRecordMetadata(Message<?> message) {
        Optional<io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<?, ?>> metadata = message.getMetadata(io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata.class).map(x -> x);
        if (metadata.isPresent()) {
            return metadata;
        }
        metadata = message.getMetadata(IncomingKafkaRecordMetadata.class).map(x -> x);
        return metadata;
    }

    private ProducerRecord<?, ?> getProducerRecord(Message<?> message, io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<?> om, io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<?, ?> im, String actualTopic) {
        int actualPartition = om == null || om.getPartition() <= -1 ? this.partition : om.getPartition();
        Object actualKey = this.getKey(message, om);
        long actualTimestamp = om == null || om.getTimestamp() == null ? -1L : (om.getTimestamp() != null ? om.getTimestamp().toEpochMilli() : -1L);
        Headers kafkaHeaders = KafkaRecordHelper.getHeaders(om, im, this.runtimeConfiguration);
        Object payload = message.getPayload();
        if (payload instanceof Record) {
            payload = ((Record)payload).value();
        }
        return new ProducerRecord(actualTopic, actualPartition == -1 ? null : Integer.valueOf(actualPartition), actualTimestamp == -1L ? null : Long.valueOf(actualTimestamp), actualKey, payload, (Iterable)kafkaHeaders);
    }

    private Object getKey(Message<?> message, io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<?> metadata) {
        if (metadata != null && metadata.getKey() != null) {
            return metadata.getKey();
        }
        if (message.getPayload() instanceof Record) {
            return ((Record)message.getPayload()).key();
        }
        if (this.runtimeConfiguration.getPropagateRecordKey().booleanValue()) {
            return message.getMetadata(io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata.class).map(io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata::getKey).orElse(this.key);
        }
        return this.key;
    }

    public Flow.Subscriber<? extends Message<?>> getSink() {
        return this.subscriber;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.isHealthEnabled) {
            ArrayList<Throwable> actualFailures;
            KafkaSink kafkaSink = this;
            synchronized (kafkaSink) {
                actualFailures = new ArrayList<Throwable>(this.failures);
            }
            if (!actualFailures.isEmpty()) {
                builder.add(this.channel, false, actualFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
            } else {
                builder.add(this.channel, true);
            }
        }
    }

    public void isReady(HealthReport.HealthReportBuilder builder) {
        if (this.health != null && this.isHealthReadinessEnabled) {
            this.health.isReady(builder);
        }
    }

    public void isStarted(HealthReport.HealthReportBuilder builder) {
        if (this.health != null) {
            this.health.isStarted(builder);
        }
    }

    public void closeQuietly() {
        if (this.processor != null) {
            this.processor.cancel();
        }
        try {
            this.client.close();
        }
        catch (Throwable e) {
            KafkaLogging.log.errorWhileClosingWriteStream(e);
        }
        if (this.health != null) {
            this.health.close();
        }
    }

    public String getChannel() {
        return this.channel;
    }

    public KafkaProducer<?, ?> getProducer() {
        return this.client;
    }
}

