/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.reply;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
import io.smallrye.reactive.messaging.kafka.reply.CorrelationId;
import io.smallrye.reactive.messaging.kafka.reply.CorrelationIdHandler;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;
import io.smallrye.reactive.messaging.kafka.reply.PendingReply;
import io.smallrye.reactive.messaging.kafka.reply.ReplyFailureHandler;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.inject.Instance;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;

@Experimental(value="Experimental API")
public class KafkaRequestReplyImpl<Req, Rep>
extends MutinyEmitterImpl<Req>
implements KafkaRequestReply<Req, Rep>,
MultiSubscriber<KafkaRecord<?, Rep>> {
    private final Map<CorrelationId, PendingReplyImpl<Rep>> pendingReplies = new ConcurrentHashMap<CorrelationId, PendingReplyImpl<Rep>>();
    private final AtomicReference<Flow.Subscription> subscription = new AtomicReference();
    private final String channel;
    private final String replyTopic;
    private final int replyPartition;
    private final Duration replyTimeout;
    private final String replyCorrelationIdHeader;
    private final String replyTopicHeader;
    private final String replyPartitionHeader;
    private final CorrelationIdHandler correlationIdHandler;
    private final ReplyFailureHandler replyFailureHandler;
    private final String autoOffsetReset;
    private final KafkaSource<Object, Rep> replySource;
    private final Set<TopicPartition> waitForPartitions;
    private final boolean gracefulShutdown;
    private final Duration initialAssignmentTimeout;
    private Function<Message<Rep>, Message<Rep>> replyConverter;

    public KafkaRequestReplyImpl(EmitterConfiguration config, long defaultBufferSize, Config channelConfiguration, Instance<Map<String, Object>> configurations, Vertx vertx, KafkaCDIEvents kafkaCDIEvents, Instance<OpenTelemetry> openTelemetryInstance, Instance<KafkaCommitHandler.Factory> commitHandlerFactory, Instance<KafkaFailureHandler.Factory> failureHandlerFactories, Instance<ClientCustomizer<Map<String, Object>>> configCustomizers, Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers, Instance<CorrelationIdHandler> correlationIdHandlers, Instance<ReplyFailureHandler> replyFailureHandlers, Instance<KafkaConsumerRebalanceListener> rebalanceListeners) {
        super(config, defaultBufferSize);
        this.channel = config.name();
        OverrideConnectorConfig connectorConfig = new OverrideConnectorConfig("mp.messaging.outgoing.", channelConfiguration, "smallrye-kafka", this.channel, "reply", Map.of("topic", c -> c.getOriginalValue("topic", String.class).orElse(this.channel) + "-replies", "assign-seek", c -> c.getOriginalValue("reply.partition", Integer.class).map(String::valueOf).orElse(null)));
        Config replyKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, (Config)connectorConfig);
        KafkaConnectorIncomingConfiguration consumerConfig = new KafkaConnectorIncomingConfiguration(replyKafkaConfig);
        this.replyTopic = consumerConfig.getTopic().orElse(null);
        this.replyPartition = connectorConfig.getOptionalValue("reply.partition", Integer.class).orElse(-1);
        this.replyTimeout = Duration.ofMillis(connectorConfig.getOptionalValue("reply.timeout", Integer.class).orElse(5000).intValue());
        int initialAssignmentTimeoutMillis = connectorConfig.getOptionalValue("reply.initial-assignment-timeout", Integer.class).orElse((int)this.replyTimeout.toMillis());
        this.initialAssignmentTimeout = initialAssignmentTimeoutMillis < 0 ? null : Duration.ofMillis(initialAssignmentTimeoutMillis);
        this.autoOffsetReset = consumerConfig.getAutoOffsetReset();
        this.replyCorrelationIdHeader = connectorConfig.getOptionalValue("reply.correlation-id.header", String.class).orElse("REPLY_CORRELATION_ID");
        this.replyTopicHeader = connectorConfig.getOptionalValue("reply.topic.header", String.class).orElse("REPLY_TOPIC");
        this.replyPartitionHeader = connectorConfig.getOptionalValue("reply.partition.header", String.class).orElse("REPLY_PARTITION");
        String correlationIdHandlerIdentifier = connectorConfig.getOptionalValue("reply.correlation-id.handler", String.class).orElse("uuid");
        this.correlationIdHandler = (CorrelationIdHandler)CDIUtils.getInstanceById(correlationIdHandlers, (String)correlationIdHandlerIdentifier).get();
        this.replyFailureHandler = connectorConfig.getOptionalValue("reply.failure.handler", String.class).map(id -> (ReplyFailureHandler)CDIUtils.getInstanceById((Instance)replyFailureHandlers, (String)id, () -> null)).orElse(null);
        String consumerGroup = consumerConfig.getGroupId().orElseGet(() -> UUID.randomUUID().toString());
        this.waitForPartitions = this.getWaitForPartitions(consumerConfig);
        this.gracefulShutdown = consumerConfig.getGracefulShutdown();
        this.replySource = new KafkaSource(vertx, consumerGroup, consumerConfig, openTelemetryInstance, commitHandlerFactory, failureHandlerFactories, rebalanceListeners, kafkaCDIEvents, configCustomizers, deserializationFailureHandlers, -1);
        if (consumerConfig.getBatch().booleanValue()) {
            this.replySource.getBatchStream().call(record -> Uni.createFrom().completionStage(() -> ((IncomingKafkaRecordBatch)record).ack())).flatMap(record -> Multi.createFrom().iterable(record.getRecords())).subscribe((Flow.Subscriber)((Object)this));
        } else {
            this.replySource.getStream().call(record -> Uni.createFrom().completionStage(() -> ((IncomingKafkaRecord)record).ack())).subscribe((Flow.Subscriber)((Object)this));
        }
    }

    private Set<TopicPartition> getWaitForPartitions(KafkaConnectorIncomingConfiguration consumerConfig) {
        Set<String> topics = KafkaSource.getTopics(consumerConfig);
        String seekToOffset = consumerConfig.getAssignSeek().orElse(null);
        Map<TopicPartition, Optional<Long>> offsetSeeks = KafkaSource.getOffsetSeeks(seekToOffset, this.channel, topics);
        if (offsetSeeks.isEmpty()) {
            return topics.stream().map(t -> TopicPartitions.getTopicPartition(t, -1)).collect(Collectors.toSet());
        }
        return offsetSeeks.keySet();
    }

    public Flow.Publisher<Message<? extends Req>> getPublisher() {
        return this.publisher.plug(m -> this.initialAssignmentTimeout != null && "latest".equals(this.autoOffsetReset) ? m.onSubscription().call(() -> this.waitForAssignments().ifNoItem().after(this.initialAssignmentTimeout).fail()) : m).onTermination().invoke(this::complete);
    }

    @Override
    public void complete() {
        super.complete();
        Subscriptions.cancel(this.subscription);
        if (this.gracefulShutdown) {
            for (int waitIteration = 0; !this.pendingReplies.isEmpty() && waitIteration < 10; ++waitIteration) {
                this.grace(this.replyTimeout.dividedBy(10L));
            }
            if (!this.pendingReplies.isEmpty()) {
                KafkaLogging.log.warnf("There are still %d pending replies after the closing timeout: %s", this.pendingReplies.size(), this.pendingReplies.keySet());
            }
        }
        this.replySource.closeQuietly();
    }

    private void grace(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public Uni<Rep> request(Req request) {
        return this.request((Message<Req>)ContextAwareMessage.of(request)).map(Message::getPayload);
    }

    @Override
    public Uni<Message<Rep>> request(Message<Req> request) {
        OutgoingKafkaRecordMetadata.OutgoingKafkaRecordMetadataBuilder builder = request.getMetadata(OutgoingKafkaRecordMetadata.class).map(metadata -> OutgoingKafkaRecordMetadata.from((OutgoingKafkaRecordMetadata)metadata)).orElseGet(OutgoingKafkaRecordMetadata::builder);
        CorrelationId correlationId = this.correlationIdHandler.generate(request);
        builder.addHeaders(new RecordHeader[]{new RecordHeader(this.replyCorrelationIdHeader, correlationId.toBytes()), new RecordHeader(this.replyTopicHeader, this.replyTopic.getBytes())});
        if (this.replyPartition != -1) {
            byte[] partition = KafkaRequestReply.replyPartitionToBytes(this.replyPartition);
            builder.addHeaders(new RecordHeader[]{new RecordHeader(this.replyPartitionHeader, partition)});
        }
        OutgoingMessageMetadata outMetadata = new OutgoingMessageMetadata();
        return this.sendMessage(request.addMetadata((Object)builder.build()).addMetadata((Object)outMetadata)).invoke(() -> this.subscription.get().request(1L)).chain(unused -> Uni.createFrom().emitter(emitter -> this.pendingReplies.put(correlationId, new PendingReplyImpl((RecordMetadata)outMetadata.getResult(), this.replyTopic, this.replyPartition, emitter))).ifNoItem().after(this.replyTimeout).fail()).onItemOrFailure().invoke(() -> this.pendingReplies.remove(correlationId)).plug(uni -> this.replyFailureHandler != null ? uni.onItem().transformToUni(f -> {
            Throwable failure = this.replyFailureHandler.handleReply((KafkaRecord)f);
            return failure != null ? Uni.createFrom().failure(failure) : Uni.createFrom().item(f);
        }) : uni).plug(uni -> this.replyConverter != null ? uni.map(f -> this.replyConverter.apply((Message<Rep>)f)) : uni);
    }

    @Override
    public Uni<Set<TopicPartition>> waitForAssignments() {
        return this.replySource.getConsumer().runOnPollingThread(c -> this.waitForPartitions.stream().flatMap(tp -> tp.partition() == -1 ? c.partitionsFor(tp.topic()).stream().map(pi -> TopicPartitions.getTopicPartition(tp.topic(), pi.partition())) : Stream.of(tp)).collect(Collectors.toSet())).chain(waitFor -> this.waitForAssignments((Collection<TopicPartition>)waitFor));
    }

    @Override
    public Uni<Set<TopicPartition>> waitForAssignments(Collection<TopicPartition> topicPartitions) {
        return this.replySource.getConsumer().getAssignments().repeat().whilst(tp -> !tp.containsAll(topicPartitions)).skip().where(Set::isEmpty).toUni();
    }

    void setReplyConverter(Function<Message<Rep>, Message<Rep>> converterFunction) {
        this.replyConverter = converterFunction;
    }

    @Override
    public Map<CorrelationId, PendingReply> getPendingReplies() {
        return new HashMap<CorrelationId, PendingReply>(this.pendingReplies);
    }

    @Override
    public KafkaConsumer<?, Rep> getConsumer() {
        return this.replySource.getConsumer();
    }

    public void onSubscribe(Flow.Subscription subscription) {
        if (Subscriptions.setIfEmpty(this.subscription, (Flow.Subscription)subscription)) {
            subscription.request(1L);
        }
    }

    public void onItem(KafkaRecord<?, Rep> record) {
        Header header = record.getHeaders().lastHeader(this.replyCorrelationIdHeader);
        if (header != null && record.getHeaders().lastHeader(this.replyTopicHeader) == null) {
            CorrelationId correlationId = this.correlationIdHandler.parse(header.value());
            PendingReplyImpl<Rep> reply = this.pendingReplies.remove(correlationId);
            if (reply != null) {
                reply.getEmitter().complete(record);
                return;
            }
            KafkaLogging.log.requestReplyRecordIgnored(this.channel, record.getTopic(), correlationId.toString());
        }
        this.subscription.get().request(1L);
    }

    public void onFailure(Throwable failure) {
        KafkaLogging.log.requestReplyConsumerFailure(this.channel, this.replyTopic, failure);
    }

    public void onCompletion() {
    }

    public static class PendingReplyImpl<Rep>
    implements PendingReply {
        private final RecordMetadata metadata;
        private final String replyTopic;
        private final int replyPartition;
        private final UniEmitter<Message<Rep>> emitter;

        public PendingReplyImpl(RecordMetadata metadata, String replyTopic, int replyPartition, UniEmitter<Message<Rep>> emitter) {
            this.replyTopic = replyTopic;
            this.replyPartition = replyPartition;
            this.metadata = metadata;
            this.emitter = emitter;
        }

        @Override
        public String replyTopic() {
            return this.replyTopic;
        }

        @Override
        public int replyPartition() {
            return this.replyPartition;
        }

        @Override
        public RecordMetadata recordMetadata() {
            return this.metadata;
        }

        public UniEmitter<Message<Rep>> getEmitter() {
            return this.emitter;
        }

        public String toString() {
            return "PendingReply{metadata=" + String.valueOf(this.metadata) + ", replyTopic='" + this.replyTopic + "', replyPartition=" + this.replyPartition + "}";
        }
    }
}

