/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.gcp.pubsub.intercept;

import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.ProjectSubscriptionName;
import io.micronaut.context.BeanContext;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Executable;
import io.micronaut.core.util.StringUtils;
import io.micronaut.gcp.GoogleCloudConfiguration;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.bind.DefaultPubSubAcknowledgement;
import io.micronaut.gcp.pubsub.bind.PubSubBinderRegistry;
import io.micronaut.gcp.pubsub.bind.PubSubConsumerState;
import io.micronaut.gcp.pubsub.bind.SubscriberFactory;
import io.micronaut.gcp.pubsub.bind.SubscriberFactoryConfig;
import io.micronaut.gcp.pubsub.configuration.PubSubConfigurationProperties;
import io.micronaut.gcp.pubsub.exception.PubSubListenerException;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler;
import io.micronaut.gcp.pubsub.serdes.PubSubMessageSerDesRegistry;
import io.micronaut.gcp.pubsub.support.PubSubSubscriptionUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.messaging.exceptions.MessageListenerException;
import jakarta.inject.Qualifier;
import jakarta.inject.Singleton;
import java.util.Arrays;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class PubSubConsumerAdvice
implements ExecutableMethodProcessor<Subscription> {
    private final Logger logger = LoggerFactory.getLogger(PubSubConsumerAdvice.class);
    private final BeanContext beanContext;
    private final SubscriberFactory subscriberFactory;
    private final GoogleCloudConfiguration googleCloudConfiguration;
    private final PubSubConfigurationProperties pubSubConfigurationProperties;
    private final PubSubBinderRegistry binderRegistry;
    private final PubSubMessageReceiverExceptionHandler exceptionHandler;

    public PubSubConsumerAdvice(BeanContext beanContext, ConversionService<?> conversionService, PubSubMessageSerDesRegistry serDesRegistry, SubscriberFactory subscriberFactory, GoogleCloudConfiguration googleCloudConfiguration, PubSubConfigurationProperties pubSubConfigurationProperties, PubSubBinderRegistry binderRegistry, PubSubMessageReceiverExceptionHandler exceptionHandler) {
        this.beanContext = beanContext;
        this.subscriberFactory = subscriberFactory;
        this.googleCloudConfiguration = googleCloudConfiguration;
        this.pubSubConfigurationProperties = pubSubConfigurationProperties;
        this.binderRegistry = binderRegistry;
        this.exceptionHandler = exceptionHandler;
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        if (beanDefinition.hasDeclaredAnnotation(PubSubListener.class)) {
            AnnotationValue subscriptionAnnotation = method.getAnnotation(Subscription.class);
            io.micronaut.context.Qualifier qualifier = beanDefinition.getAnnotationTypeByStereotype(Qualifier.class).map(type -> Qualifiers.byAnnotation((AnnotationMetadata)beanDefinition, (Class)type)).orElse(null);
            boolean hasAckArg = Arrays.stream(method.getArguments()).anyMatch(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()));
            Class beanType = beanDefinition.getBeanType();
            Object bean = this.beanContext.findBean(beanType, qualifier).orElseThrow(() -> new MessageListenerException("Could not find the bean to execute the method " + method));
            DefaultExecutableBinder binder = new DefaultExecutableBinder();
            if (subscriptionAnnotation != null) {
                String subscriptionName = (String)subscriptionAnnotation.getRequiredValue(String.class);
                ProjectSubscriptionName projectSubscriptionName = PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.googleCloudConfiguration.getProjectId());
                String defaultContentType = subscriptionAnnotation.stringValue("contentType").orElse("application/json");
                String configuration = subscriptionAnnotation.stringValue("configuration").orElse("");
                MessageReceiver receiver = (message, ackReplyConsumer) -> {
                    String messageContentType = message.getAttributesMap().getOrDefault("Content-Type", "");
                    String contentType = Optional.of(messageContentType).filter(StringUtils::isNotEmpty).orElse(defaultContentType);
                    DefaultPubSubAcknowledgement pubSubAcknowledgement = new DefaultPubSubAcknowledgement(ackReplyConsumer);
                    PubSubConsumerState consumerState = new PubSubConsumerState(message, ackReplyConsumer, projectSubscriptionName, contentType);
                    try {
                        BoundExecutable executable = null;
                        try {
                            executable = binder.bind((Executable)method, (ArgumentBinderRegistry)this.binderRegistry, (Object)consumerState);
                        }
                        catch (Exception ex) {
                            this.handleException(new PubSubMessageReceiverException("Error binding message to the method", ex, bean, consumerState));
                        }
                        executable.invoke(bean);
                        if (!hasAckArg) {
                            pubSubAcknowledgement.ack();
                        } else {
                            DefaultPubSubAcknowledgement manualAck;
                            Optional<Object> boundAck = Arrays.stream(executable.getBoundArguments()).filter(o -> o instanceof DefaultPubSubAcknowledgement).findFirst();
                            if (boundAck.isPresent() && !(manualAck = (DefaultPubSubAcknowledgement)boundAck.get()).isClientAck()) {
                                this.logger.warn("Method {} was executed and no message acknowledge detected. Did you forget to invoke ack()/nack()?", (Object)method.getName());
                            }
                        }
                    }
                    catch (Exception e) {
                        this.handleException(new PubSubMessageReceiverException("Error handling message", e, bean, consumerState));
                    }
                };
                try {
                    this.subscriberFactory.createSubscriber(new SubscriberFactoryConfig(projectSubscriptionName, receiver, configuration, this.pubSubConfigurationProperties.getSubscribingExecutor()));
                }
                catch (Exception e) {
                    throw new PubSubListenerException("Failed to create subscriber", e);
                }
            }
        }
    }

    private void handleException(PubSubMessageReceiverException ex) {
        Object bean = ex.getListener();
        if (bean instanceof PubSubMessageReceiverExceptionHandler) {
            ((PubSubMessageReceiverExceptionHandler)bean).handle((Throwable)((Object)ex));
        } else {
            this.exceptionHandler.handle((Throwable)((Object)ex));
        }
    }
}

