/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.nats.intercept;

import io.micronaut.context.BeanContext;
import io.micronaut.context.Qualifier;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.type.Executable;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.nats.ConsumerRegistry;
import io.micronaut.nats.annotation.NatsConnection;
import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;
import io.micronaut.nats.bind.NatsBinderRegistry;
import io.micronaut.nats.exception.NatsListenerException;
import io.micronaut.nats.exception.NatsListenerExceptionHandler;
import io.micronaut.nats.intercept.StaticConsumerState;
import io.micronaut.nats.serdes.NatsMessageSerDes;
import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.micronaut.runtime.ApplicationConfiguration;
import io.nats.client.Connection;
import io.nats.client.Consumer;
import io.nats.client.Dispatcher;
import io.nats.client.MessageHandler;
import io.nats.client.Subscription;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Singleton
@Bean(preDestroy="close")
public class NatsConsumerAdvice
implements ExecutableMethodProcessor<Subject>,
AutoCloseable,
ConsumerRegistry {
    private final BeanContext beanContext;
    private final NatsBinderRegistry binderRegistry;
    private final NatsMessageSerDesRegistry serDesRegistry;
    private final NatsListenerExceptionHandler exceptionHandler;
    private final ApplicationConfiguration applicationConfiguration;
    private final Map<String, StaticConsumerState> consumers = new ConcurrentHashMap<String, StaticConsumerState>();
    private final AtomicInteger clientIdGenerator = new AtomicInteger(10);

    public NatsConsumerAdvice(BeanContext beanContext, NatsBinderRegistry binderRegistry, NatsMessageSerDesRegistry serDesRegistry, NatsListenerExceptionHandler exceptionHandler, ApplicationConfiguration applicationConfiguration) {
        this.beanContext = beanContext;
        this.binderRegistry = binderRegistry;
        this.serDesRegistry = serDesRegistry;
        this.exceptionHandler = exceptionHandler;
        this.applicationConfiguration = applicationConfiguration;
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        AnnotationValue listenerAnnotation = method.getAnnotation(NatsListener.class);
        List subjectAnnotations = method.getDeclaredAnnotationValuesByType(Subject.class);
        if (listenerAnnotation == null || subjectAnnotations.isEmpty()) {
            return;
        }
        String connectionName = method.stringValue(NatsConnection.class, "connection").orElse("default");
        Qualifier qualifier = beanDefinition.getAnnotationTypeByStereotype("javax.inject.Qualifier").map(type -> Qualifiers.byAnnotation((AnnotationMetadata)beanDefinition, (Class)type)).orElse(null);
        Class beanType = beanDefinition.getBeanType();
        Class returnTypeClass = method.getReturnType().getType();
        boolean isVoid = returnTypeClass == Void.class || returnTypeClass == Void.TYPE;
        Object bean = this.beanContext.findBean(beanType, qualifier).orElseThrow(() -> new MessageListenerException("Could not find the bean to execute the method " + method));
        Connection connection = (Connection)this.beanContext.getBean(Connection.class, Qualifiers.byName((String)connectionName));
        DefaultExecutableBinder binder = new DefaultExecutableBinder();
        Dispatcher ds = connection.createDispatcher();
        String clientId = listenerAnnotation.stringValue("clientId").filter(StringUtils::isNotEmpty).orElseGet(() -> this.applicationConfiguration.getName().map(s -> s + "-" + NameUtils.hyphenate((String)beanType.getSimpleName())).orElse("nats-consumer-" + this.clientIdGenerator.incrementAndGet()));
        MessageHandler messageHandler = msg -> {
            BoundExecutable boundExecutable = null;
            try {
                boundExecutable = binder.bind((Executable)method, (ArgumentBinderRegistry)this.binderRegistry, (Object)msg);
            }
            catch (Exception e) {
                this.handleException(new NatsListenerException("An error occurred binding the message to the method", e, bean, msg));
            }
            if (boundExecutable != null) {
                Object returnedValue = boundExecutable.invoke(bean);
                if (!isVoid && StringUtils.isNotEmpty((CharSequence)msg.getReplyTo())) {
                    byte[] converted = null;
                    if (returnedValue != null) {
                        NatsMessageSerDes serDes = this.serDesRegistry.findSerdes(method.getReturnType().asArgument()).map(NatsMessageSerDes.class::cast).orElseThrow(() -> new NatsListenerException(String.format("Could not find a serializer for the body argument of type [%s]", returnedValue.getClass().getName()), bean, msg));
                        converted = serDes.serialize(returnedValue);
                    }
                    connection.publish(msg.getReplyTo(), converted);
                }
            }
        };
        HashSet<Subscription> subscriptions = new HashSet<Subscription>();
        for (AnnotationValue subjectAnnotation : subjectAnnotations) {
            String subject = (String)subjectAnnotation.getRequiredValue(String.class);
            Optional queueOptional = subjectAnnotation.get((CharSequence)"queue", String.class);
            if (queueOptional.isPresent() && !((String)queueOptional.get()).isEmpty()) {
                subscriptions.add(ds.subscribe(subject, (String)queueOptional.get(), messageHandler));
                continue;
            }
            subscriptions.add(ds.subscribe(subject, messageHandler));
        }
        this.consumers.put(clientId, new StaticConsumerState(clientId, subscriptions, ds, connection));
    }

    @Override
    @PreDestroy
    public void close() {
        for (StaticConsumerState consumerState : this.consumers.values()) {
            consumerState.close();
        }
        this.consumers.clear();
    }

    private void handleException(NatsListenerException exception) {
        Object object = exception.getListener();
        if (object instanceof NatsListenerExceptionHandler) {
            NatsListenerExceptionHandler bean = (NatsListenerExceptionHandler)object;
            bean.handle((Throwable)((Object)exception));
        } else {
            this.exceptionHandler.handle((Throwable)((Object)exception));
        }
    }

    @Override
    @NonNull
    public Consumer getConsumer(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Dispatcher dispatcher = this.getConsumerState(id).getDispatcher();
        if (dispatcher == null) {
            throw new IllegalArgumentException("No consumer found for ID:" + id);
        }
        return dispatcher;
    }

    @Override
    @NonNull
    public Set<String> getConsumerIds() {
        return Collections.unmodifiableSet(this.consumers.keySet());
    }

    @NonNull
    private StaticConsumerState getConsumerState(@NonNull String id) {
        StaticConsumerState consumerState = this.consumers.get(id);
        if (consumerState == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + id);
        }
        return consumerState;
    }

    @Override
    @NonNull
    public Set<Subscription> getConsumerSubscription(@NonNull String id) {
        ArgumentUtils.requireNonNull((String)"id", (Object)id);
        Set<Subscription> subscriptions = this.getConsumerState(id).getSubscriptions();
        if (subscriptions == null || subscriptions.isEmpty()) {
            throw new IllegalArgumentException("No consumer subscription found for ID: " + id);
        }
        return subscriptions;
    }

    @Override
    public Subscription newSubscription(@NonNull String subject, @Nullable String queue) {
        Connection connection = (Connection)this.beanContext.getBean(Connection.class, Qualifiers.byName((String)"default"));
        if (queue == null) {
            return connection.subscribe(subject);
        }
        return connection.subscribe(subject, queue);
    }

    @Override
    public Subscription newSubscription(@NonNull String connectionName, @NonNull String subject, @Nullable String queue) {
        Connection connection = (Connection)this.beanContext.getBean(Connection.class, Qualifiers.byName((String)connectionName));
        if (queue == null) {
            return connection.subscribe(subject);
        }
        return connection.subscribe(subject, queue);
    }
}

