/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.nats.intercept;

import io.micronaut.aop.InterceptedMethod;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.caffeine.cache.Cache;
import io.micronaut.caffeine.cache.Caffeine;
import io.micronaut.context.BeanContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.NatsConnection;
import io.micronaut.nats.annotation.Subject;
import io.micronaut.nats.exception.NatsClientException;
import io.micronaut.nats.intercept.StaticPublisherState;
import io.micronaut.nats.reactive.PublishState;
import io.micronaut.nats.reactive.ReactivePublisher;
import io.micronaut.nats.serdes.NatsMessageSerDes;
import io.micronaut.nats.serdes.NatsMessageSerDesRegistry;
import io.nats.client.Message;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Singleton
public class NatsIntroductionAdvice
implements MethodInterceptor<Object, Object> {
    private static final Logger LOG = LoggerFactory.getLogger(NatsIntroductionAdvice.class);
    private final BeanContext beanContext;
    private final Scheduler scheduler;
    private final ConversionService<?> conversionService;
    private final NatsMessageSerDesRegistry serDesRegistry;
    private final Cache<ExecutableMethod, StaticPublisherState> publisherCache = Caffeine.newBuilder().build();

    public NatsIntroductionAdvice(BeanContext beanContext, ConversionService<?> conversionService, NatsMessageSerDesRegistry serDesRegistry, @Named(value="consumer") ExecutorService executorService) {
        this.beanContext = beanContext;
        this.conversionService = conversionService;
        this.serDesRegistry = serDesRegistry;
        this.scheduler = Schedulers.fromExecutorService((ExecutorService)executorService);
    }

    public Object intercept(MethodInvocationContext<Object, Object> context) {
        if (context.hasAnnotation(NatsClient.class)) {
            StaticPublisherState publisherState = (StaticPublisherState)this.publisherCache.get((Object)context.getExecutableMethod(), method -> {
                ReactivePublisher reactivePublisher;
                if (!method.findAnnotation(NatsClient.class).isPresent()) {
                    throw new IllegalStateException("No @NatsClient annotation present on method: " + method);
                }
                Optional subjectAnn = method.findAnnotation(Subject.class);
                Optional subject = subjectAnn.flatMap(s -> s.getValue(String.class));
                String connection = method.findAnnotation(NatsConnection.class).flatMap(conn -> conn.get((CharSequence)"connection", String.class)).orElse("default");
                Argument<?> bodyArgument = this.findBodyArgument((ExecutableMethod<?, ?>)method).orElseThrow(() -> new NatsClientException("No valid message body argument found for method: " + method));
                NatsMessageSerDes<?> serDes = this.serDesRegistry.findSerdes(bodyArgument).orElseThrow(() -> new NatsClientException(String.format("Could not find a serializer for the body argument of type [%s]", bodyArgument.getType().getName())));
                try {
                    reactivePublisher = (ReactivePublisher)this.beanContext.getBean(ReactivePublisher.class, Qualifiers.byName((String)connection));
                }
                catch (Throwable e) {
                    throw new NatsClientException(String.format("Failed to retrieve a publisher named [%s] to publish messages", connection), e);
                }
                return new StaticPublisherState(subject.orElse(null), bodyArgument, method.getReturnType(), connection, serDes, reactivePublisher);
            });
            Map parameterValues = context.getParameterValueMap();
            Object body = parameterValues.get(publisherState.getBodyArgument().getName());
            byte[] converted = publisherState.getSerDes().serialize(body);
            String subject = publisherState.getSubject().orElse(this.findSubjectKey(context).orElse(null));
            if (subject == null) {
                throw new IllegalStateException("No @Subject annotation present on method: " + context.getExecutableMethod());
            }
            PublishState publishState = new PublishState(subject, converted);
            ReactivePublisher reactivePublisher = publisherState.getReactivePublisher();
            InterceptedMethod interceptedMethod = InterceptedMethod.of(context);
            try {
                Mono reactive;
                boolean rpc;
                boolean bl = rpc = !interceptedMethod.returnTypeValue().isVoid();
                if (rpc) {
                    reactive = Mono.from(reactivePublisher.publishAndReply(publishState)).flatMap(message -> {
                        Object deserialized = this.deserialize((Message)message, (Argument)publisherState.getDataType(), (Argument)publisherState.getDataType());
                        if (deserialized == null) {
                            return Mono.empty();
                        }
                        return Mono.just((Object)deserialized);
                    });
                    if (interceptedMethod.resultType() == InterceptedMethod.ResultType.SYNCHRONOUS) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Publish is an RPC call. Blocking until a response is received.", context);
                        }
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Publish is an RPC call. Publisher will complete when a response is received.", context);
                        }
                        reactive = reactive.subscribeOn(this.scheduler);
                    }
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sending the message.", context);
                    }
                    reactive = Mono.from(reactivePublisher.publish(publishState)).onErrorMap(throwable -> new NatsClientException(String.format("Failed to publish a message with subject: [%s]", subject), (Throwable)throwable, Collections.singletonList(publishState)));
                }
                switch (interceptedMethod.resultType()) {
                    case PUBLISHER: {
                        return interceptedMethod.handleResult((Object)reactive);
                    }
                    case COMPLETION_STAGE: {
                        final CompletableFuture future = new CompletableFuture();
                        reactive.subscribe((Subscriber)new Subscriber<Object>(){
                            Object value = null;

                            public void onSubscribe(Subscription s) {
                                s.request(1L);
                            }

                            public void onNext(Object o) {
                                this.value = o;
                            }

                            public void onError(Throwable t) {
                                future.completeExceptionally(t);
                            }

                            public void onComplete() {
                                future.complete(this.value);
                            }
                        });
                        return interceptedMethod.handleResult(future);
                    }
                    case SYNCHRONOUS: {
                        return interceptedMethod.handleResult(reactive.block());
                    }
                }
                return interceptedMethod.unsupported();
            }
            catch (Exception e) {
                return interceptedMethod.handleException(e);
            }
        }
        return context.proceed();
    }

    private Object deserialize(Message message, Argument dataType, Argument returnType) {
        Optional serDes = this.serDesRegistry.findSerdes(dataType);
        if (serDes.isPresent()) {
            return serDes.get().deserialize(message, returnType);
        }
        throw new NatsClientException(String.format("Could not find a deserializer for [%s]", dataType.getName()));
    }

    private Optional<Argument<?>> findBodyArgument(ExecutableMethod<?, ?> method) {
        return Optional.ofNullable(Arrays.stream(method.getArguments()).filter(arg -> arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)).findFirst().orElseGet(() -> Arrays.stream(method.getArguments()).filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Subject.class)).findFirst().orElse(null)));
    }

    private Optional<String> findSubjectKey(MethodInvocationContext<Object, Object> method) {
        Map argumentValues = method.getParameterValueMap();
        return Arrays.stream(method.getArguments()).filter(arg -> arg.getAnnotationMetadata().hasAnnotation(Subject.class)).map(Argument::getName).map(argumentValues::get).filter(Objects::nonNull).map(Object::toString).findFirst();
    }
}

