/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.nats.intercept;

import io.micronaut.aop.InterceptedMethod;
import io.micronaut.aop.InterceptorBean;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.context.BeanContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.exception.NatsClientException;
import io.micronaut.nats.intercept.AbstractIntroductionAdvice;
import io.micronaut.nats.intercept.StaticPublisherState;
import io.micronaut.nats.reactive.ReactivePublisher;
import io.micronaut.nats.serdes.NatsMessageSerDes;
import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.nats.client.Message;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

@Singleton
@InterceptorBean(value={NatsClient.class})
public class NatsIntroductionAdvice
extends AbstractIntroductionAdvice
implements MethodInterceptor<Object, Object> {
    private static final Logger LOG = LoggerFactory.getLogger(NatsIntroductionAdvice.class);
    private final Map<ExecutableMethod, StaticPublisherState> publisherCache = new ConcurrentHashMap<ExecutableMethod, StaticPublisherState>();

    public NatsIntroductionAdvice(BeanContext beanContext, ConversionService conversionService, NatsMessageSerDesRegistry serDesRegistry, @Named(value="blocking") ExecutorService executorService) {
        super(beanContext, executorService, conversionService, serDesRegistry);
    }

    public Object intercept(MethodInvocationContext<Object, Object> context) {
        if (context.hasAnnotation(NatsClient.class)) {
            StaticPublisherState publisherState = this.publisherCache.computeIfAbsent(context.getExecutableMethod(), this::buildPublisherState);
            Message message = this.buildNatsMessage(context, publisherState.getHeaders(), publisherState.getBodyArgument(), publisherState.getSerDes(), publisherState.getSubject());
            ReactivePublisher reactivePublisher = publisherState.getReactivePublisher();
            InterceptedMethod interceptedMethod = InterceptedMethod.of(context, (ConversionService)this.conversionService);
            try {
                Mono reactive;
                boolean rpc = false;
                if (interceptedMethod.resultType() == InterceptedMethod.ResultType.SYNCHRONOUS) {
                    rpc = !interceptedMethod.returnTypeValue().isVoid();
                } else {
                    boolean bl = rpc = !context.getReturnType().asArgument().isVoid();
                }
                if (rpc) {
                    reactive = Mono.from(reactivePublisher.publishAndReply(message)).flatMap(response -> {
                        Object deserialized = this.deserialize((Message)response, (Argument<Object>)publisherState.getDataType(), (Argument<Object>)publisherState.getDataType());
                        if (deserialized == null) {
                            return Mono.empty();
                        }
                        return Mono.just((Object)deserialized);
                    });
                    if (interceptedMethod.resultType() == InterceptedMethod.ResultType.SYNCHRONOUS) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Publish is an RPC call. Blocking until a response is received. {}", context);
                        }
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Publish is an RPC call. Publisher will complete when a response is received. {}", context);
                        }
                        reactive = reactive.subscribeOn(this.scheduler);
                    }
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sending the message. {}", context);
                    }
                    reactive = Mono.from(reactivePublisher.publish(message)).onErrorMap(throwable -> new NatsClientException(String.format("Failed to publish a message with subject: [%s]", message.getSubject()), (Throwable)throwable, Collections.singletonList(message)));
                }
                return NatsIntroductionAdvice.handleResult(interceptedMethod, reactive);
            }
            catch (Exception e) {
                return interceptedMethod.handleException(e);
            }
        }
        return context.proceed();
    }

    private Object deserialize(Message message, Argument<Object> dataType, Argument<Object> returnType) {
        Optional<NatsMessageSerDes<Object>> serDes = this.serDesRegistry.findSerdes(dataType);
        if (serDes.isPresent()) {
            return serDes.get().deserialize(message, returnType);
        }
        throw new NatsClientException(String.format("Could not find a deserializer for [%s]", dataType.getName()));
    }
}

