/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.nats.jetstream.intercept;

import io.micronaut.context.BeanContext;
import io.micronaut.context.Qualifier;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.type.Executable;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.nats.annotation.NatsConnection;
import io.micronaut.nats.annotation.Subject;
import io.micronaut.nats.bind.NatsBinderRegistry;
import io.micronaut.nats.intercept.StaticConsumerState;
import io.micronaut.nats.jetstream.PushConsumerRegistry;
import io.micronaut.nats.jetstream.annotation.JetStreamListener;
import io.micronaut.nats.jetstream.annotation.PushConsumer;
import io.micronaut.nats.jetstream.exception.JetStreamListenerException;
import io.micronaut.nats.jetstream.exception.JetStreamListenerExceptionHandler;
import io.micronaut.runtime.ApplicationConfiguration;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.MessageHandler;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.Subscription;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.ReplayPolicy;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Singleton
@Bean(preDestroy="close")
public class JetStreamPushConsumerAdvice
implements ExecutableMethodProcessor<PushConsumer>,
AutoCloseable,
PushConsumerRegistry {
    private final BeanContext beanContext;
    private final NatsBinderRegistry binderRegistry;
    private final JetStreamListenerExceptionHandler exceptionHandler;
    private final ApplicationConfiguration applicationConfiguration;
    private final Map<String, StaticConsumerState> consumers = new ConcurrentHashMap<String, StaticConsumerState>();
    private final AtomicInteger clientIdGenerator = new AtomicInteger(10);

    public JetStreamPushConsumerAdvice(BeanContext beanContext, NatsBinderRegistry binderRegistry, JetStreamListenerExceptionHandler exceptionHandler, ApplicationConfiguration applicationConfiguration) {
        this.beanContext = beanContext;
        this.binderRegistry = binderRegistry;
        this.exceptionHandler = exceptionHandler;
        this.applicationConfiguration = applicationConfiguration;
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        Optional listenerAnnotation = method.findAnnotation(JetStreamListener.class);
        if (!listenerAnnotation.isPresent()) {
            return;
        }
        Optional pushConsumerAnnotation = method.findAnnotation(PushConsumer.class);
        Optional subjectAnnotation = method.findAnnotation(Subject.class);
        if (!pushConsumerAnnotation.isPresent()) {
            return;
        }
        AnnotationValue pushConsumer = (AnnotationValue)pushConsumerAnnotation.get();
        String subject = subjectAnnotation.flatMap(a -> a.getValue(String.class)).filter(StringUtils::isNotEmpty).orElseThrow(() -> new MessageListenerException("In the @PushConsumer Annotation is the subject attribute missing for the method " + method));
        String streamName = pushConsumer.getValue(String.class).filter(StringUtils::isNotEmpty).orElseThrow(() -> new MessageListenerException("In the @PushConsumer Annotation is the value attribute missing for the method " + method));
        Optional ackPolicy = pushConsumer.enumValue("ackPolicy", AckPolicy.class);
        boolean autoAck = this.determineAckMode(method, ackPolicy);
        String connectionName = method.stringValue(NatsConnection.class, "connection").orElse("default");
        Qualifier qualifier = beanDefinition.getAnnotationTypeByStereotype("javax.inject.Qualifier").map(type -> Qualifiers.byAnnotation((AnnotationMetadata)beanDefinition, (Class)type)).orElse(null);
        Class beanType = beanDefinition.getBeanType();
        Object bean = this.beanContext.findBean(beanType, qualifier).orElseThrow(() -> new MessageListenerException("Could not find the bean to execute the method " + method));
        String clientId = listenerAnnotation.flatMap(a -> a.stringValue("clientId")).filter(StringUtils::isNotEmpty).orElseGet(() -> this.applicationConfiguration.getName().map(s -> s + "-" + NameUtils.hyphenate((String)beanType.getSimpleName())).orElse("jetstream-consumer-" + this.clientIdGenerator.incrementAndGet()));
        JetStream jetStream = (JetStream)this.beanContext.getBean(JetStream.class, Qualifiers.byName((String)connectionName));
        Connection connection = (Connection)this.beanContext.getBean(Connection.class, Qualifiers.byName((String)connectionName));
        DefaultExecutableBinder binder = new DefaultExecutableBinder();
        Dispatcher ds = connection.createDispatcher();
        MessageHandler messageHandler = msg -> {
            try {
                BoundExecutable boundExecutable = binder.bind((Executable)method, (ArgumentBinderRegistry)this.binderRegistry, (Object)msg);
                if (boundExecutable != null) {
                    boundExecutable.invoke(bean);
                }
            }
            catch (Exception e) {
                this.handleException(new JetStreamListenerException("An error occurred binding the message to the method", e, bean, msg));
            }
        };
        HashSet<Subscription> subscriptions = new HashSet<Subscription>();
        try {
            PushSubscribeOptions options = this.buildConsumerConfiguration(streamName, (AnnotationValue<PushConsumer>)pushConsumer);
            Optional queueOptional = pushConsumer.get((CharSequence)"queue", String.class);
            if (queueOptional.isPresent() && !((String)queueOptional.get()).isEmpty()) {
                subscriptions.add((Subscription)jetStream.subscribe(subject, (String)queueOptional.get(), ds, messageHandler, autoAck, options));
            } else {
                subscriptions.add((Subscription)jetStream.subscribe(subject, ds, messageHandler, autoAck, options));
            }
        }
        catch (JetStreamApiException | IOException e) {
            this.handleException(new JetStreamListenerException("An error occurred binding the message to the method", e, bean));
        }
        this.consumers.put(clientId, new StaticConsumerState(clientId, subscriptions, ds, connection));
    }

    private boolean determineAckMode(ExecutableMethod<?, ?> method, Optional<AckPolicy> ackPolicy) {
        boolean autoAck = true;
        if (ackPolicy.isPresent()) {
            AckPolicy policy = ackPolicy.get();
            if (policy != AckPolicy.All) {
                autoAck = false;
            }
            if (policy == AckPolicy.Explicit && Arrays.stream(method.getArguments()).noneMatch(a -> Acknowledgement.class.isAssignableFrom(a.getType()))) {
                throw new MessageListenerException("The ackPolicy for method " + method + " is explicit. The method must have a argument of type Achnowledgement");
            }
        }
        return autoAck;
    }

    private PushSubscribeOptions buildConsumerConfiguration(String streamName, @NonNull AnnotationValue<PushConsumer> annotationValue) {
        ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder();
        builder = annotationValue.stringValue("durable").filter(StringUtils::isNotEmpty).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).durable(arg_0)).orElse(builder);
        builder = annotationValue.enumValue("deliverPolicy", DeliverPolicy.class).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).deliverPolicy(arg_0)).orElse(builder);
        builder = annotationValue.stringValue("deliverSubject").filter(StringUtils::isNotEmpty).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).deliverSubject(arg_0)).orElse(builder);
        builder = annotationValue.get((CharSequence)"startSequence", Long.class).filter(value -> value != Long.MIN_VALUE).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).startSequence(arg_0)).orElse(builder);
        builder = annotationValue.enumValue("ackPolicy", AckPolicy.class).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).ackPolicy(arg_0)).orElse(builder);
        builder = annotationValue.get((CharSequence)"ackWait", Long.class).filter(value -> value != Long.MIN_VALUE).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).ackWait(arg_0)).orElse(builder);
        builder = annotationValue.enumValue("replayPolicy", ReplayPolicy.class).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).replayPolicy(arg_0)).orElse(builder);
        builder = annotationValue.get((CharSequence)"maxDeliver", Long.class).filter(value -> value != Long.MIN_VALUE).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).maxDeliver(arg_0)).orElse(builder);
        builder = annotationValue.stringValue("filterSubject").filter(StringUtils::isNotEmpty).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).filterSubject(arg_0)).orElse(builder);
        builder = annotationValue.get((CharSequence)"rateLimit", Long.class).filter(value -> value != Long.MIN_VALUE).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).rateLimit(arg_0)).orElse(builder);
        builder = annotationValue.stringValue("sampleFrequency").filter(StringUtils::isNotEmpty).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).sampleFrequency(arg_0)).orElse(builder);
        builder = annotationValue.get((CharSequence)"idleHeartbeat", Long.class).filter(value -> value != Long.MIN_VALUE).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).idleHeartbeat(arg_0)).orElse(builder);
        builder = annotationValue.get((CharSequence)"flowControl", Long.class).filter(value -> value != Long.MIN_VALUE).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).flowControl(arg_0)).orElse(builder);
        long[] backoff = annotationValue.longValues("backoff");
        if (backoff.length > 0 && backoff[0] != Long.MIN_VALUE) {
            builder = builder.backoff(backoff);
        }
        builder = annotationValue.booleanValue("headersOnly").map(arg_0 -> ((ConsumerConfiguration.Builder)builder).headersOnly(arg_0)).orElse(builder);
        builder = annotationValue.get((CharSequence)"maxAckPending", Long.class).filter(value -> value != Long.MIN_VALUE).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).maxAckPending(arg_0)).orElse(builder);
        builder = annotationValue.stringValue("deliverGroup").filter(StringUtils::isNotEmpty).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).deliverGroup(arg_0)).orElse(builder);
        builder = annotationValue.stringValue("description").filter(StringUtils::isNotEmpty).map(arg_0 -> ((ConsumerConfiguration.Builder)builder).description(arg_0)).orElse(builder);
        ConsumerConfiguration cc = builder.build();
        return ((PushSubscribeOptions.Builder)((PushSubscribeOptions.Builder)((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().stream(streamName)).durable(cc.getDurable())).configuration(cc)).ordered(annotationValue.booleanValue("ordered").orElse(false).booleanValue()).build();
    }

    @Override
    @PreDestroy
    public void close() {
        for (StaticConsumerState consumerState : this.consumers.values()) {
            if (consumerState.getConnection().getStatus() == Connection.Status.CLOSED) continue;
            consumerState.getConnection().closeDispatcher(consumerState.getDispatcher());
        }
        this.consumers.clear();
    }

    private void handleException(JetStreamListenerException exception) {
        Object bean = exception.getListener();
        if (bean instanceof JetStreamListenerExceptionHandler) {
            ((JetStreamListenerExceptionHandler)bean).handle((Throwable)((Object)exception));
        } else {
            this.exceptionHandler.handle((Throwable)((Object)exception));
        }
    }

    @Override
    public JetStreamSubscription newSubscription(String subject, PushSubscribeOptions pushSubscribeOptions, String queue) throws JetStreamApiException, IOException {
        JetStream jetStream = (JetStream)this.beanContext.getBean(JetStream.class, Qualifiers.byName((String)"default"));
        if (queue == null) {
            return jetStream.subscribe(subject, pushSubscribeOptions);
        }
        return jetStream.subscribe(subject, queue, pushSubscribeOptions);
    }

    @Override
    public JetStreamSubscription newSubscription(String connectionName, String subject, PushSubscribeOptions pushSubscribeOptions, String queue) throws JetStreamApiException, IOException {
        JetStream jetStream = (JetStream)this.beanContext.getBean(JetStream.class, Qualifiers.byName((String)connectionName));
        if (queue == null) {
            return jetStream.subscribe(subject, pushSubscribeOptions);
        }
        return jetStream.subscribe(subject, queue, pushSubscribeOptions);
    }
}

