/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaIgnoreFailure;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.JsonHelper;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import java.lang.annotation.Annotation;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Provider;

public class KafkaSource<K, V> {
    private final Multi<IncomingKafkaRecord<K, V>> stream;
    private final KafkaConsumer<K, V> consumer;
    private final KafkaFailureHandler failureHandler;

    public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration config, Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners) {
        HashMap<String, String> kafkaConfiguration = new HashMap<String, String>();
        String group = config.getGroupId().orElseGet(() -> {
            String s = UUID.randomUUID().toString();
            KafkaLogging.log.noGroupId(s);
            return s;
        });
        JsonHelper.asJsonObject(config.config()).forEach(e -> kafkaConfiguration.put((String)e.getKey(), e.getValue().toString()));
        kafkaConfiguration.put("group.id", group);
        String servers = config.getBootstrapServers();
        if (!kafkaConfiguration.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", servers);
            kafkaConfiguration.put("bootstrap.servers", servers);
        }
        if (!kafkaConfiguration.containsKey("key.deserializer")) {
            KafkaLogging.log.keyDeserializerOmitted();
            kafkaConfiguration.put("key.deserializer", config.getKeyDeserializer());
        }
        kafkaConfiguration.remove("channel-name");
        kafkaConfiguration.remove("topic");
        kafkaConfiguration.remove("connector");
        kafkaConfiguration.remove("retry");
        kafkaConfiguration.remove("retry-attempts");
        kafkaConfiguration.remove("broadcast");
        kafkaConfiguration.remove("partitions");
        kafkaConfiguration.remove("consumer-rebalance-listener.name");
        KafkaConsumer kafkaConsumer = KafkaConsumer.create((Vertx)vertx, kafkaConfiguration);
        config.getConsumerRebalanceListenerName().map(name -> {
            KafkaLogging.log.loadingConsumerRebalanceListenerFromConfiguredName((String)name);
            return NamedLiteral.of((String)name);
        }).map(xva$0 -> consumerRebalanceListeners.select(new Annotation[]{xva$0})).map(Provider::get).map(Optional::of).orElseGet(() -> {
            Instance rebalanceFromGroupListeners = consumerRebalanceListeners.select(new Annotation[]{NamedLiteral.of((String)group)});
            if (!rebalanceFromGroupListeners.isUnsatisfied()) {
                KafkaLogging.log.loadingConsumerRebalanceListenerFromGroupId(group);
                return Optional.of(rebalanceFromGroupListeners.get());
            }
            return Optional.empty();
        }).ifPresent(listener -> {
            long consumerReEnableWaitTime = Long.parseLong(kafkaConfiguration.getOrDefault("max.poll.interval.ms", "300000")) + (kafkaConfiguration.get("group.instance.id") == null ? 0L : Long.parseLong(kafkaConfiguration.getOrDefault("session.timeout.ms", "10000")));
            long consumerReEnableRetryMaxAttempts = 1L + Math.max(0L, 3L + (consumerReEnableWaitTime - 14000L) / 10000L);
            kafkaConsumer.partitionsAssignedHandler(set -> {
                kafkaConsumer.pause();
                KafkaLogging.log.executingConsumerAssignedRebalanceListener(group);
                listener.onPartitionsAssigned((KafkaConsumer<?, ?>)kafkaConsumer, (Set<TopicPartition>)set).onFailure().invoke(t -> KafkaLogging.log.unableToExecuteConsumerAssignedRebalanceListener(group, (Throwable)t)).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(10L)).withJitter(0.0).atMost(consumerReEnableRetryMaxAttempts).subscribe().with(a -> {
                    KafkaLogging.log.executedConsumerAssignedRebalanceListener(group);
                    kafkaConsumer.resume();
                }, t -> {
                    KafkaLogging.log.reEnablingConsumerforGroup(group);
                    kafkaConsumer.resume();
                });
            });
            kafkaConsumer.partitionsRevokedHandler(set -> {
                KafkaLogging.log.executingConsumerRevokedRebalanceListener(group);
                listener.onPartitionsRevoked((KafkaConsumer<?, ?>)kafkaConsumer, (Set<TopicPartition>)set).subscribe().with(a -> KafkaLogging.log.executedConsumerRevokedRebalanceListener(group), t -> KafkaLogging.log.unableToExecuteConsumerRevokedRebalanceListener(group, (Throwable)t));
            });
        });
        this.consumer = kafkaConsumer;
        String topic = config.getTopic().orElseGet(config::getChannel);
        this.failureHandler = this.createFailureHandler(config, vertx, kafkaConfiguration);
        Multi multi = this.consumer.toMulti().onFailure().invoke(t -> KafkaLogging.log.unableToReadRecord(topic, (Throwable)t));
        boolean retry = config.getRetry();
        if (retry) {
            int max = config.getRetryAttempts();
            int maxWait = config.getRetryMaxWait();
            if (max == -1) {
                multi.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(maxWait)).atMost(Long.MAX_VALUE);
            } else {
                multi = multi.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(maxWait)).atMost((long)max);
            }
        }
        this.stream = multi.on().subscribed(s -> this.consumer.subscribeAndAwait(topic)).map(rec -> new IncomingKafkaRecord<K, V>(this.consumer, rec, this.failureHandler));
    }

    private KafkaFailureHandler createFailureHandler(KafkaConnectorIncomingConfiguration config, Vertx vertx, Map<String, String> kafkaConfiguration) {
        String strategy = config.getFailureStrategy();
        KafkaFailureHandler.Strategy actualStrategy = KafkaFailureHandler.Strategy.from(strategy);
        switch (actualStrategy) {
            case FAIL: {
                return new KafkaFailStop(config.getChannel());
            }
            case IGNORE: {
                return new KafkaIgnoreFailure(config.getChannel());
            }
            case DEAD_LETTER_QUEUE: {
                return KafkaDeadLetterQueue.create(vertx, kafkaConfiguration, config);
            }
        }
        throw KafkaExceptions.ex.illegalArgumentInvalidStrategy(strategy);
    }

    public Multi<IncomingKafkaRecord<K, V>> getStream() {
        return this.stream;
    }

    public void closeQuietly() {
        try {
            this.consumer.closeAndAwait();
        }
        catch (Throwable e) {
            KafkaLogging.log.exceptionOnClose(e);
        }
    }
}

