/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanContext;
import io.opentelemetry.trace.attributes.SemanticAttributes;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.commit.KafkaIgnoreCommit;
import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit;
import io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaIgnoreFailure;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.JsonHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaAdminHelper;
import io.vertx.core.AsyncResult;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.admin.KafkaAdminClient;
import java.lang.annotation.Annotation;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Provider;

public class KafkaSource<K, V> {
    private final Multi<IncomingKafkaRecord<K, V>> stream;
    private final io.vertx.mutiny.kafka.client.consumer.KafkaConsumer<K, V> consumer;
    private final KafkaFailureHandler failureHandler;
    private final KafkaCommitHandler commitHandler;
    private final KafkaConnectorIncomingConfiguration configuration;
    private final KafkaAdminClient admin;
    private final List<Throwable> failures = new ArrayList<Throwable>();
    private final Set<String> topics;
    private final Pattern pattern;
    private final boolean isTracingEnabled;
    private final boolean isHealthEnabled;
    private final boolean isReadinessEnabled;
    private final boolean isCloudEventEnabled;
    private final String channel;

    public KafkaSource(Vertx vertx, String group, KafkaConnectorIncomingConfiguration config, Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners, KafkaCDIEvents kafkaCDIEvents) {
        this.topics = this.getTopics(config);
        if (config.getPattern().booleanValue()) {
            this.pattern = Pattern.compile(config.getTopic().orElseThrow(() -> new IllegalArgumentException("Invalid Kafka incoming configuration for channel `" + config.getChannel() + "`, `pattern` must be used with the `topic` attribute")));
            KafkaLogging.log.configuredPattern(config.getChannel(), this.pattern.toString());
        } else {
            KafkaLogging.log.configuredTopics(config.getChannel(), this.topics);
            this.pattern = null;
        }
        HashMap<String, String> kafkaConfiguration = new HashMap<String, String>();
        this.configuration = config;
        this.isTracingEnabled = this.configuration.getTracingEnabled();
        this.isHealthEnabled = this.configuration.getHealthEnabled();
        this.isReadinessEnabled = this.configuration.getHealthReadinessEnabled();
        this.isCloudEventEnabled = this.configuration.getCloudEvents();
        this.channel = this.configuration.getChannel();
        JsonHelper.asJsonObject(config.config()).forEach(e -> kafkaConfiguration.put((String)e.getKey(), e.getValue().toString()));
        kafkaConfiguration.put("group.id", group);
        String servers = config.getBootstrapServers();
        if (!kafkaConfiguration.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", servers);
            kafkaConfiguration.put("bootstrap.servers", servers);
        }
        if (!kafkaConfiguration.containsKey("key.deserializer")) {
            KafkaLogging.log.keyDeserializerOmitted();
            kafkaConfiguration.put("key.deserializer", config.getKeyDeserializer());
        }
        if (!kafkaConfiguration.containsKey("enable.auto.commit")) {
            KafkaLogging.log.disableAutoCommit(config.getChannel());
            kafkaConfiguration.put("enable.auto.commit", "false");
        }
        String commitStrategy = config.getCommitStrategy().orElse(Boolean.parseBoolean((String)kafkaConfiguration.get("enable.auto.commit")) ? KafkaCommitHandler.Strategy.IGNORE.name() : KafkaCommitHandler.Strategy.LATEST.name());
        kafkaConfiguration.remove("channel-name");
        kafkaConfiguration.remove("connector");
        kafkaConfiguration.remove("health-enabled");
        kafkaConfiguration.remove("health-readiness-enabled");
        kafkaConfiguration.remove("tracing-enabled");
        kafkaConfiguration.remove("topic");
        kafkaConfiguration.remove("topics");
        kafkaConfiguration.remove("pattern");
        kafkaConfiguration.remove("connector");
        kafkaConfiguration.remove("retry");
        kafkaConfiguration.remove("retry-attempts");
        kafkaConfiguration.remove("retry-max-wait");
        kafkaConfiguration.remove("broadcast");
        kafkaConfiguration.remove("partitions");
        kafkaConfiguration.remove("failure-strategy");
        kafkaConfiguration.remove("commit-strategy");
        kafkaConfiguration.remove("throttled.unprocessed-record-max-age.ms");
        kafkaConfiguration.remove("dead-letter-queue.topic");
        kafkaConfiguration.remove("dead-letter-queue.key.serializer");
        kafkaConfiguration.remove("dead-letter-queue.value.serializer");
        kafkaConfiguration.remove("partitions");
        kafkaConfiguration.remove("cloud-events");
        kafkaConfiguration.remove("consumer-rebalance-listener.name");
        io.vertx.mutiny.kafka.client.consumer.KafkaConsumer kafkaConsumer = io.vertx.mutiny.kafka.client.consumer.KafkaConsumer.create((Vertx)vertx, kafkaConfiguration);
        kafkaCDIEvents.consumer().fire((Object)kafkaConsumer.getDelegate().unwrap());
        this.commitHandler = this.createCommitHandler(vertx, kafkaConsumer, group, config, commitStrategy);
        Optional rebalanceListener = config.getConsumerRebalanceListenerName().map(name -> {
            KafkaLogging.log.loadingConsumerRebalanceListenerFromConfiguredName((String)name);
            return NamedLiteral.of((String)name);
        }).map(xva$0 -> consumerRebalanceListeners.select(new Annotation[]{xva$0})).map(Provider::get).map(Optional::of).orElseGet(() -> {
            Instance rebalanceFromGroupListeners = consumerRebalanceListeners.select(new Annotation[]{NamedLiteral.of((String)group)});
            if (!rebalanceFromGroupListeners.isUnsatisfied()) {
                KafkaLogging.log.loadingConsumerRebalanceListenerFromGroupId(group);
                return Optional.of(rebalanceFromGroupListeners.get());
            }
            return Optional.empty();
        });
        if (rebalanceListener.isPresent()) {
            KafkaConsumerRebalanceListener listener = (KafkaConsumerRebalanceListener)rebalanceListener.get();
            long consumerReEnableWaitTime = Long.parseLong(kafkaConfiguration.getOrDefault("max.poll.interval.ms", "300000")) + (kafkaConfiguration.get("group.instance.id") == null ? 0L : Long.parseLong(kafkaConfiguration.getOrDefault("session.timeout.ms", "10000"))) + 11000L;
            kafkaConsumer.partitionsAssignedHandler(set -> {
                long currentDemand = kafkaConsumer.demand();
                kafkaConsumer.pause();
                this.commitHandler.partitionsAssigned((Set<TopicPartition>)set);
                KafkaLogging.log.executingConsumerAssignedRebalanceListener(group);
                listener.onPartitionsAssigned((io.vertx.mutiny.kafka.client.consumer.KafkaConsumer<?, ?>)kafkaConsumer, (Set<TopicPartition>)set).onFailure().invoke(t -> KafkaLogging.log.unableToExecuteConsumerAssignedRebalanceListener(group, (Throwable)t)).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(10L)).expireIn(consumerReEnableWaitTime).subscribe().with(a -> {
                    KafkaLogging.log.executedConsumerAssignedRebalanceListener(group);
                    kafkaConsumer.fetch(currentDemand);
                }, t -> {
                    KafkaLogging.log.reEnablingConsumerforGroup(group);
                    kafkaConsumer.fetch(currentDemand);
                });
            });
            kafkaConsumer.partitionsRevokedHandler(set -> {
                KafkaLogging.log.executingConsumerRevokedRebalanceListener(group);
                listener.onPartitionsRevoked((io.vertx.mutiny.kafka.client.consumer.KafkaConsumer<?, ?>)kafkaConsumer, (Set<TopicPartition>)set).subscribe().with(a -> KafkaLogging.log.executedConsumerRevokedRebalanceListener(group), t -> KafkaLogging.log.unableToExecuteConsumerRevokedRebalanceListener(group, (Throwable)t));
            });
        } else {
            kafkaConsumer.partitionsAssignedHandler(this.commitHandler::partitionsAssigned);
        }
        this.failureHandler = this.createFailureHandler(config, vertx, kafkaConfiguration, kafkaCDIEvents);
        HashMap<String, Object> adminConfiguration = new HashMap<String, Object>(kafkaConfiguration);
        this.admin = config.getHealthEnabled() != false && config.getHealthReadinessEnabled() != false ? KafkaAdminHelper.createAdminClient(vertx, adminConfiguration) : null;
        this.consumer = kafkaConsumer;
        Multi multi = this.consumer.toMulti().onFailure().invoke(t -> {
            KafkaLogging.log.unableToReadRecord(this.topics, (Throwable)t);
            this.reportFailure((Throwable)t);
        });
        boolean retry = config.getRetry();
        if (retry) {
            int max = config.getRetryAttempts();
            int maxWait = config.getRetryMaxWait();
            if (max == -1) {
                multi.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(maxWait)).atMost(Long.MAX_VALUE);
            } else {
                multi = multi.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(maxWait)).atMost((long)max);
            }
        }
        Multi incomingMulti = multi.onSubscribe().call(s -> {
            this.consumer.exceptionHandler(this::reportFailure);
            if (this.pattern != null) {
                BiConsumer<UniEmitter, AsyncResult> completionHandler = (e, ar) -> {
                    if (ar.failed()) {
                        e.fail(ar.cause());
                    } else {
                        e.complete(null);
                    }
                };
                return Uni.createFrom().emitter(e -> {
                    KafkaConsumer delegate = this.consumer.getDelegate();
                    delegate.subscribe(this.pattern, ar -> completionHandler.accept((UniEmitter)e, (AsyncResult)ar));
                });
            }
            return this.consumer.subscribe(this.topics);
        }).map(rec -> this.commitHandler.received(new IncomingKafkaRecord(rec, this.commitHandler, this.failureHandler, this.isCloudEventEnabled, this.isTracingEnabled)));
        if (config.getTracingEnabled().booleanValue()) {
            incomingMulti = incomingMulti.onItem().invoke(this::incomingTrace);
        }
        this.stream = incomingMulti.onFailure().invoke(this::reportFailure);
    }

    private Set<String> getTopics(KafkaConnectorIncomingConfiguration config) {
        String list = config.getTopics().orElse(null);
        String top = config.getTopic().orElse(null);
        String channel = config.getChannel();
        boolean isPattern = config.getPattern();
        if (list != null && top != null) {
            throw new IllegalArgumentException("The Kafka incoming configuration for channel `" + channel + "` cannot use `topics` and `topic` at the same time");
        }
        if (list != null && isPattern) {
            throw new IllegalArgumentException("The Kafka incoming configuration for channel `" + channel + "` cannot use `topics` and `pattern` at the same time");
        }
        if (list != null) {
            String[] strings = list.split(",");
            return Arrays.stream(strings).map(String::trim).collect(Collectors.toSet());
        }
        if (top != null) {
            return Collections.singleton(top);
        }
        return Collections.singleton(channel);
    }

    public synchronized void reportFailure(Throwable failure) {
        KafkaLogging.log.failureReported(this.topics, failure);
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(failure);
    }

    public void incomingTrace(IncomingKafkaRecord<K, V> kafkaRecord) {
        if (this.isTracingEnabled) {
            TracingMetadata tracingMetadata = TracingMetadata.fromMessage(kafkaRecord).orElse(TracingMetadata.empty());
            Span.Builder spanBuilder = KafkaConnector.TRACER.spanBuilder(kafkaRecord.getTopic() + " receive").setSpanKind(Span.Kind.CONSUMER);
            SpanContext parentSpan = tracingMetadata.getPreviousSpanContext();
            if (parentSpan != null && parentSpan.isValid()) {
                spanBuilder.setParent(parentSpan);
            } else {
                spanBuilder.setNoParent();
            }
            Span span = spanBuilder.startSpan();
            span.setAttribute("partition", (long)kafkaRecord.getPartition());
            span.setAttribute("offset", kafkaRecord.getOffset());
            SemanticAttributes.MESSAGING_SYSTEM.set(span, "kafka");
            SemanticAttributes.MESSAGING_DESTINATION.set(span, kafkaRecord.getTopic());
            SemanticAttributes.MESSAGING_DESTINATION_KIND.set(span, "topic");
            kafkaRecord.injectTracingMetadata(tracingMetadata.withSpan(span));
            span.end();
        }
    }

    private KafkaFailureHandler createFailureHandler(KafkaConnectorIncomingConfiguration config, Vertx vertx, Map<String, String> kafkaConfiguration, KafkaCDIEvents kafkaCDIEvents) {
        String strategy = config.getFailureStrategy();
        KafkaFailureHandler.Strategy actualStrategy = KafkaFailureHandler.Strategy.from(strategy);
        switch (actualStrategy) {
            case FAIL: {
                return new KafkaFailStop(config.getChannel(), this);
            }
            case IGNORE: {
                return new KafkaIgnoreFailure(config.getChannel());
            }
            case DEAD_LETTER_QUEUE: {
                return KafkaDeadLetterQueue.create(vertx, kafkaConfiguration, config, this, kafkaCDIEvents);
            }
        }
        throw KafkaExceptions.ex.illegalArgumentInvalidFailureStrategy(strategy);
    }

    private KafkaCommitHandler createCommitHandler(Vertx vertx, io.vertx.mutiny.kafka.client.consumer.KafkaConsumer<K, V> consumer, String group, KafkaConnectorIncomingConfiguration config, String strategy) {
        KafkaCommitHandler.Strategy actualStrategy = KafkaCommitHandler.Strategy.from(strategy);
        switch (actualStrategy) {
            case LATEST: {
                return new KafkaLatestCommit(vertx, consumer);
            }
            case IGNORE: {
                return new KafkaIgnoreCommit();
            }
            case THROTTLED: {
                return KafkaThrottledLatestProcessedCommit.create(vertx, consumer, group, config, this);
            }
        }
        throw KafkaExceptions.ex.illegalArgumentInvalidCommitStrategy(strategy);
    }

    public Multi<IncomingKafkaRecord<K, V>> getStream() {
        return this.stream;
    }

    public void closeQuietly() {
        try {
            this.commitHandler.terminate();
            this.failureHandler.terminate();
            this.consumer.closeAndAwait();
        }
        catch (Throwable e) {
            KafkaLogging.log.exceptionOnClose(e);
        }
        if (this.admin != null) {
            try {
                this.admin.closeAndAwait();
            }
            catch (Throwable e) {
                KafkaLogging.log.exceptionOnClose(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.isHealthEnabled) {
            ArrayList<Throwable> actualFailures;
            KafkaSource kafkaSource = this;
            synchronized (kafkaSource) {
                actualFailures = new ArrayList<Throwable>(this.failures);
            }
            if (!actualFailures.isEmpty()) {
                builder.add(this.channel, false, actualFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
            } else {
                builder.add(this.channel, true);
            }
        }
    }

    public void isReady(HealthReport.HealthReportBuilder builder) {
        if (this.isHealthEnabled && this.isReadinessEnabled) {
            try {
                Set existingTopics = (Set)this.admin.listTopics().await().atMost(Duration.ofMillis(this.configuration.getHealthReadinessTimeout()));
                if (this.pattern == null && existingTopics.containsAll(this.topics)) {
                    builder.add(this.channel, true);
                } else if (this.pattern != null) {
                    boolean ok = existingTopics.stream().anyMatch(s -> this.pattern.matcher((CharSequence)s).matches());
                    if (ok) {
                        builder.add(this.channel, ok);
                    } else {
                        builder.add(this.channel, false, "Unable to find a topic matching the given pattern: " + this.pattern);
                    }
                } else {
                    String missing = this.topics.stream().filter(s -> !existingTopics.contains(s)).collect(Collectors.joining());
                    builder.add(this.channel, false, "Unable to find topic(s): " + missing);
                }
            }
            catch (Exception failed) {
                builder.add(this.channel, false, "No response from broker for channel " + this.channel + " : " + failed);
            }
        }
    }

    public io.vertx.mutiny.kafka.client.consumer.KafkaConsumer<K, V> getConsumer() {
        return this.consumer;
    }
}

