/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.pulsar.intercept;

import io.micronaut.aop.InterceptorBean;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.context.BeanContext;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.ReturnType;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.pulsar.annotation.PulsarReader;
import io.micronaut.pulsar.annotation.PulsarReaderClient;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;

@InterceptorBean(value={PulsarReaderClient.class})
public class PulsarReaderAdvice
implements MethodInterceptor<Object, Object> {
    protected final BeanContext beanContext;
    protected final ConversionService conversionService;

    public PulsarReaderAdvice(BeanContext beanContext, ConversionService conversionService) {
        this.beanContext = beanContext;
        this.conversionService = conversionService;
    }

    public Object intercept(MethodInvocationContext<Object, Object> context) {
        if (!context.hasAnnotation(PulsarReader.class)) {
            return context.proceed();
        }
        if (!context.getExecutableMethod().isAbstract()) {
            throw new IllegalArgumentException(String.format("Non abstract method cannot be annotated as Readers: %s", context.getExecutableMethod().getDescription(false)));
        }
        ReturnType returnType = context.getExecutableMethod().getReturnType();
        Argument argumentReturnType = returnType.isAsyncOrReactive() ? (Argument)returnType.getFirstTypeVariable().orElseThrow(() -> new IllegalArgumentException("Could not extract return type for %s. Async / reactive ")) : returnType.asArgument();
        AnnotationValue annotationValue = context.getAnnotation(PulsarReader.class);
        Reader reader = (Reader)this.beanContext.createBean(Reader.class, new Object[]{annotationValue, argumentReturnType, context});
        try {
            return this.read(reader, returnType, (AnnotationValue<PulsarReader>)annotationValue);
        }
        catch (PulsarClientException e) {
            throw new MessageListenerException(String.format("Failed to read message on topic %s", reader.getTopic()), (Throwable)e);
        }
    }

    private Object read(Reader<?> reader, ReturnType<?> returnType, AnnotationValue<PulsarReader> annotationValue) throws PulsarClientException {
        if (returnType.isAsyncOrReactive()) {
            Argument wrapped = (Argument)returnType.getFirstTypeVariable().orElseThrow(() -> new IllegalStateException("Missing inner type for async reader."));
            if (Message.class.isAssignableFrom(wrapped.getType())) {
                return this.readAsync(returnType, reader.readNextAsync());
            }
            return this.readAsync(returnType, (CompletableFuture<?>)reader.readNextAsync().thenApply(Message::getValue));
        }
        if (Message.class.isAssignableFrom(returnType.getType())) {
            return PulsarReaderAdvice.readBlocking(reader, annotationValue);
        }
        Message<?> msg = PulsarReaderAdvice.readBlocking(reader, annotationValue);
        return msg.getValue();
    }

    private static Message<?> readBlocking(Reader<?> reader, AnnotationValue<PulsarReader> annotationValue) throws PulsarClientException {
        int timeout = Objects.requireNonNull(annotationValue).intValue("readTimeout").orElse(0);
        TimeUnit timeUnit = annotationValue.get((CharSequence)"timeoutUnit", TimeUnit.class).orElse(TimeUnit.SECONDS);
        if (timeout > 0) {
            return reader.readNext(timeout, timeUnit);
        }
        return reader.readNext();
    }

    private Object readAsync(ReturnType<?> returnType, CompletableFuture<?> reading) {
        if (CompletableFuture.class == returnType.getType()) {
            return reading;
        }
        return Publishers.convertPublisher((ConversionService)this.conversionService, reading, (Class)returnType.getType());
    }
}

