/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.gcp.pubsub.intercept;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.BeanContext;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Executable;
import io.micronaut.core.util.StringUtils;
import io.micronaut.gcp.GoogleCloudConfiguration;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.bind.DefaultPubSubAcknowledgement;
import io.micronaut.gcp.pubsub.bind.PubSubBinderRegistry;
import io.micronaut.gcp.pubsub.bind.PubSubConsumerState;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler;
import io.micronaut.gcp.pubsub.support.PubSubSubscriptionUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.messaging.exceptions.MessageListenerException;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Qualifier;
import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

@Internal
abstract class AbstractPubSubConsumerMethodProcessor<A extends Annotation>
implements ExecutableMethodProcessor<A> {
    protected final BeanContext beanContext;
    protected final ConversionService conversionService;
    protected final GoogleCloudConfiguration googleCloudConfiguration;
    protected final PubSubBinderRegistry binderRegistry;
    protected final PubSubMessageReceiverExceptionHandler exceptionHandler;
    private final AtomicBoolean shutDownMode = new AtomicBoolean(false);
    private final Class<A> annotationType;
    private final Logger logger = LoggerFactory.getLogger(AbstractPubSubConsumerMethodProcessor.class);

    protected AbstractPubSubConsumerMethodProcessor(Class<A> annotationType, BeanContext beanContext, ConversionService conversionService, GoogleCloudConfiguration googleCloudConfiguration, PubSubBinderRegistry binderRegistry, PubSubMessageReceiverExceptionHandler exceptionHandler) {
        this.annotationType = annotationType;
        this.beanContext = beanContext;
        this.conversionService = conversionService;
        this.googleCloudConfiguration = googleCloudConfiguration;
        this.binderRegistry = binderRegistry;
        this.exceptionHandler = exceptionHandler;
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        if (beanDefinition.hasDeclaredAnnotation(PubSubListener.class)) {
            AnnotationValue subscriptionAnnotation = method.getAnnotation(this.annotationType);
            io.micronaut.context.Qualifier qualifier = beanDefinition.getAnnotationTypeByStereotype(Qualifier.class).map(type -> Qualifiers.byAnnotation((AnnotationMetadata)beanDefinition, (Class)type)).orElse(null);
            boolean hasAckArg = Arrays.stream(method.getArguments()).anyMatch(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()));
            Class beanType = beanDefinition.getBeanType();
            Object bean = this.beanContext.findBean(beanType, qualifier).orElseThrow(() -> new MessageListenerException("Could not find the bean to execute the method " + method));
            if (subscriptionAnnotation != null) {
                DefaultExecutableBinder binder = new DefaultExecutableBinder();
                String subscriptionName = (String)subscriptionAnnotation.getRequiredValue(String.class);
                ProjectSubscriptionName projectSubscriptionName = PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.googleCloudConfiguration.getProjectId());
                String defaultContentType = subscriptionAnnotation.stringValue("contentType").orElse("application/json");
                String configuration = subscriptionAnnotation.stringValue("configuration").orElse("");
                MessageReceiver receiver = this.buildMessageReceiver(beanDefinition, method, defaultContentType, projectSubscriptionName, hasAckArg, (DefaultExecutableBinder<PubSubConsumerState>)binder, bean);
                this.addSubscriber(projectSubscriptionName, receiver, configuration);
            }
        }
    }

    private MessageReceiver buildMessageReceiver(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method, String defaultContentType, ProjectSubscriptionName projectSubscriptionName, boolean hasAckArg, DefaultExecutableBinder<PubSubConsumerState> binder, Object bean) {
        return (message, ackReplyConsumer) -> {
            if (!this.doBeforeSubscriber(message, ackReplyConsumer)) {
                return;
            }
            String messageContentType = message.getAttributesMap().getOrDefault("Content-Type", "");
            String contentType = Optional.of(messageContentType).filter(StringUtils::isNotEmpty).orElse(defaultContentType);
            DefaultPubSubAcknowledgement pubSubAcknowledgement = new DefaultPubSubAcknowledgement(ackReplyConsumer);
            PubSubConsumerState consumerState = new PubSubConsumerState(message, ackReplyConsumer, projectSubscriptionName, contentType);
            boolean autoAcknowledge = !hasAckArg;
            try {
                BoundExecutable executable = binder.bind((Executable)method, (ArgumentBinderRegistry)this.binderRegistry, (Object)consumerState);
                Flux<Object> resultPublisher = this.executeSubscriberMethod(beanDefinition, method, (BoundExecutable<Object, Object>)executable, bean);
                resultPublisher.subscribe(data -> {}, ex -> this.handleException(new PubSubMessageReceiverException("Error handling message", (Throwable)ex, bean, consumerState, autoAcknowledge)), autoAcknowledge ? pubSubAcknowledgement::ack : () -> this.verifyManualAcknowledgment((BoundExecutable<Object, Object>)executable, method.getName()));
            }
            catch (UnsatisfiedArgumentException e) {
                this.handleException(new PubSubMessageReceiverException("Error binding message to the method", e, bean, consumerState, autoAcknowledge));
            }
            catch (Exception e) {
                this.handleException(new PubSubMessageReceiverException("Error handling message", e, bean, consumerState, autoAcknowledge));
            }
        };
    }

    @PreDestroy
    public final void shutDown() {
        this.shutDownMode.set(true);
    }

    protected boolean doBeforeSubscriber(PubsubMessage message, AckReplyConsumer ackReplyConsumer) {
        return true;
    }

    protected abstract void addSubscriber(@NonNull ProjectSubscriptionName var1, @NonNull MessageReceiver var2, @Nullable String var3);

    protected boolean isShutDownInitiated() {
        return this.shutDownMode.get();
    }

    protected void handleException(PubSubMessageReceiverException ex) {
        Object object = ex.getListener();
        if (object instanceof PubSubMessageReceiverExceptionHandler) {
            PubSubMessageReceiverExceptionHandler bean = (PubSubMessageReceiverExceptionHandler)object;
            bean.handle((Throwable)((Object)ex));
        } else {
            this.exceptionHandler.handle((Throwable)((Object)ex));
        }
    }

    protected Flux<Object> executeSubscriberMethod(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method, BoundExecutable<Object, Object> executable, Object bean) {
        Object result = Objects.requireNonNull(executable).invoke(bean);
        if (!Publishers.isConvertibleToPublisher((Object)result)) {
            return Flux.empty();
        }
        return Flux.from((Publisher)((Publisher)Publishers.convertPublisher((ConversionService)this.conversionService, (Object)result, Publisher.class)));
    }

    private void verifyManualAcknowledgment(BoundExecutable<Object, Object> executable, String methodName) {
        DefaultPubSubAcknowledgement manualAck;
        Optional<Object> boundAck = Arrays.stream(executable.getBoundArguments()).filter(o -> o instanceof DefaultPubSubAcknowledgement).findFirst();
        if (boundAck.isPresent() && !(manualAck = (DefaultPubSubAcknowledgement)boundAck.get()).isClientAck()) {
            this.logger.warn("Method {} was executed and no message acknowledge detected. Did you forget to invoke ack()/nack()?", (Object)methodName);
        }
    }
}

