/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.nats.jetstream.intercept;

import io.micronaut.aop.InterceptedMethod;
import io.micronaut.aop.InterceptorBean;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.context.BeanContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.nats.exception.NatsClientException;
import io.micronaut.nats.intercept.AbstractIntroductionAdvice;
import io.micronaut.nats.intercept.StaticPublisherState;
import io.micronaut.nats.jetstream.annotation.JetStreamClient;
import io.micronaut.nats.jetstream.intercept.JetStreamPublisherState;
import io.micronaut.nats.jetstream.reactive.ReactivePublisher;
import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.nats.client.Message;
import io.nats.client.PublishOptions;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

@Singleton
@InterceptorBean(value={JetStreamClient.class})
public class JetStreamIntroductionAdvice
extends AbstractIntroductionAdvice
implements MethodInterceptor<Object, Object> {
    private static final Logger LOG = LoggerFactory.getLogger(JetStreamIntroductionAdvice.class);
    private final Map<ExecutableMethod<?, ?>, JetStreamPublisherState> publisherCache = new ConcurrentHashMap();

    public JetStreamIntroductionAdvice(BeanContext beanContext, ConversionService conversionService, NatsMessageSerDesRegistry serDesRegistry, @Named(value="blocking") ExecutorService executorService) {
        super(beanContext, executorService, conversionService, serDesRegistry);
    }

    public Object intercept(MethodInvocationContext<Object, Object> context) {
        if (context.hasAnnotation(JetStreamClient.class)) {
            JetStreamPublisherState publisherState = this.publisherCache.computeIfAbsent(context.getExecutableMethod(), method -> {
                ReactivePublisher reactivePublisher;
                StaticPublisherState staticPublisherState = this.buildPublisherState((ExecutableMethod<?, ?>)method);
                try {
                    reactivePublisher = (ReactivePublisher)this.beanContext.getBean(ReactivePublisher.class, Qualifiers.byName((String)staticPublisherState.getConnection()));
                }
                catch (Exception e) {
                    throw new NatsClientException(String.format("Failed to retrieve a publisher named [%s] to publish messages", staticPublisherState.getConnection()), e);
                }
                Optional<Argument<?>> publishOptions = Arrays.stream(method.getArguments()).filter(arg -> PublishOptions.class.isAssignableFrom(arg.getType())).findFirst();
                return new JetStreamPublisherState(staticPublisherState, reactivePublisher, publishOptions);
            });
            Message message = this.buildNatsMessage(context, publisherState.getHeaders(), publisherState.getBodyArgument(), publisherState.getSerDes(), publisherState.getSubject());
            ReactivePublisher reactivePublisher = publisherState.getJetstreanReactivePublisher();
            InterceptedMethod interceptedMethod = InterceptedMethod.of(context, (ConversionService)this.conversionService);
            try {
                PublishOptions publishOptions;
                Optional<Argument<?>> publishOptionsOptional;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Sending the message from context {}.", context);
                }
                Mono reactive = (publishOptionsOptional = publisherState.getPublishOptions()).isPresent() ? ((publishOptions = (PublishOptions)context.getParameterValueMap().get(publishOptionsOptional.get().getName())) != null ? Mono.from(reactivePublisher.publish(message, publishOptions)) : Mono.from(reactivePublisher.publish(message))) : Mono.from(reactivePublisher.publish(message));
                reactive = reactive.onErrorMap(throwable -> new NatsClientException(String.format("Failed to publish a message with subject: [%s]", message.getSubject()), (Throwable)throwable, Collections.singletonList(message)));
                if (interceptedMethod.resultType() != InterceptedMethod.ResultType.SYNCHRONOUS) {
                    reactive = reactive.subscribeOn(this.scheduler);
                }
                return JetStreamIntroductionAdvice.handleResult(interceptedMethod, reactive);
            }
            catch (Exception e) {
                return interceptedMethod.handleException(e);
            }
        }
        return context.proceed();
    }
}

