/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.rabbitmq.intercept;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.RecoverableChannel;
import io.micronaut.context.BeanContext;
import io.micronaut.context.Qualifier;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Executable;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.messaging.exceptions.MessageAcknowledgementException;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitConnection;
import io.micronaut.rabbitmq.annotation.RabbitListener;
import io.micronaut.rabbitmq.annotation.RabbitProperty;
import io.micronaut.rabbitmq.bind.RabbitBinderRegistry;
import io.micronaut.rabbitmq.bind.RabbitConsumerState;
import io.micronaut.rabbitmq.bind.RabbitMessageCloseable;
import io.micronaut.rabbitmq.connect.ChannelPool;
import io.micronaut.rabbitmq.exception.RabbitListenerException;
import io.micronaut.rabbitmq.exception.RabbitListenerExceptionHandler;
import io.micronaut.rabbitmq.intercept.DefaultConsumer;
import io.micronaut.rabbitmq.intercept.MutableBasicProperties;
import io.micronaut.rabbitmq.serdes.RabbitMessageSerDes;
import io.micronaut.rabbitmq.serdes.RabbitMessageSerDesRegistry;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class RabbitMQConsumerAdvice
implements ExecutableMethodProcessor<RabbitListener>,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConsumerAdvice.class);
    private final BeanContext beanContext;
    private final RabbitBinderRegistry binderRegistry;
    private final RabbitListenerExceptionHandler exceptionHandler;
    private final RabbitMessageSerDesRegistry serDesRegistry;
    private final ConversionService conversionService;
    private final Map<Channel, ConsumerState> consumerChannels = new ConcurrentHashMap<Channel, ConsumerState>();

    public RabbitMQConsumerAdvice(BeanContext beanContext, RabbitBinderRegistry binderRegistry, RabbitListenerExceptionHandler exceptionHandler, RabbitMessageSerDesRegistry serDesRegistry, ConversionService conversionService) {
        this.beanContext = beanContext;
        this.binderRegistry = binderRegistry;
        this.exceptionHandler = exceptionHandler;
        this.serDesRegistry = serDesRegistry;
        this.conversionService = conversionService;
    }

    public void process(BeanDefinition<?> beanDefinition, final ExecutableMethod<?, ?> method) {
        AnnotationValue queueAnn = method.getAnnotation(Queue.class);
        if (queueAnn != null) {
            ChannelPool channelPool;
            String queue = (String)queueAnn.getRequiredValue(String.class);
            final String clientTag = method.getDeclaringType().getSimpleName() + '#' + method.toString();
            final boolean reQueue = (Boolean)queueAnn.getRequiredValue("reQueue", Boolean.TYPE);
            boolean exclusive = (Boolean)queueAnn.getRequiredValue("exclusive", Boolean.TYPE);
            final boolean hasAckArg = Arrays.stream(method.getArguments()).anyMatch(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()));
            String connection = method.findAnnotation(RabbitConnection.class).flatMap(conn -> conn.get((CharSequence)"connection", String.class)).orElse("default");
            try {
                channelPool = (ChannelPool)this.beanContext.getBean(ChannelPool.class, Qualifiers.byName((String)connection));
            }
            catch (Throwable e) {
                throw new MessageListenerException(String.format("Failed to retrieve a channel pool named [%s] to register a listener", connection), e);
            }
            final Channel channel = this.getChannel(channelPool);
            Integer prefetch = queueAnn.get((CharSequence)"prefetch", Integer.class).orElse(null);
            try {
                if (prefetch != null) {
                    channel.basicQos(prefetch.intValue());
                }
            }
            catch (IOException e) {
                throw new MessageListenerException(String.format("Failed to set a prefetch count of [%s] on the channel", prefetch), (Throwable)e);
            }
            ConsumerState state = new ConsumerState();
            state.channelPool = channelPool;
            state.consumerTag = clientTag;
            this.consumerChannels.put(channel, state);
            HashMap arguments = new HashMap();
            List propertyAnnotations = method.getAnnotationValuesByType(RabbitProperty.class);
            Collections.reverse(propertyAnnotations);
            propertyAnnotations.forEach(prop -> {
                String name = (String)prop.getRequiredValue("name", String.class);
                String value = prop.getValue(String.class).orElse(null);
                Class type = prop.get((CharSequence)"type", Class.class).orElse(null);
                if (!StringUtils.isNotEmpty((CharSequence)name) || !StringUtils.isNotEmpty((CharSequence)value)) return;
                if (type != null && type != Void.class) {
                    Optional converted = this.conversionService.convert((Object)value, type);
                    if (!converted.isPresent()) throw new MessageListenerException(String.format("Could not convert the argument [%s] to the required type [%s]", name, type));
                    arguments.put(name, converted.get());
                    return;
                } else {
                    arguments.put(name, value);
                }
            });
            Qualifier qualifer = beanDefinition.getAnnotationTypeByStereotype(javax.inject.Qualifier.class).map(type -> Qualifiers.byAnnotation((AnnotationMetadata)beanDefinition, (Class)type)).orElse(null);
            Class beanType = beanDefinition.getBeanType();
            Class returnTypeClass = method.getReturnType().getType();
            final boolean isVoid = returnTypeClass == Void.class || returnTypeClass == Void.TYPE;
            final Object bean = this.beanContext.findBean(beanType, qualifer).orElseThrow(() -> new MessageListenerException("Could not find the bean to execute the method " + method));
            try {
                final DefaultExecutableBinder binder = new DefaultExecutableBinder();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Registering a consumer to queue [{}] with client tag [{}]", (Object)queue, (Object)clientTag);
                }
                Optional executor = method.findAnnotation(RabbitConnection.class).flatMap(conn -> conn.get((CharSequence)"executor", String.class));
                final ExecutorService executorService = executor.flatMap(exec -> this.beanContext.findBean(ExecutorService.class, Qualifiers.byName((String)exec))).orElse(null);
                if (executor.isPresent() && executorService == null) {
                    throw new MessageListenerException(String.format("Could not find the executor service [%s] specified for the method [%s]", executor.get(), method));
                }
                channel.basicConsume(queue, false, clientTag, false, exclusive, arguments, (Consumer)new DefaultConsumer(){

                    @Override
                    public void handleTerminate(String consumerTag) {
                        if (channel instanceof RecoverableChannel) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("The channel was been terminated.  Automatic recovery attempt is underway for consumer [{}]", (Object)clientTag);
                            }
                        } else {
                            ConsumerState state = (ConsumerState)RabbitMQConsumerAdvice.this.consumerChannels.remove(channel);
                            if (state != null) {
                                state.channelPool.returnChannel(channel);
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("The channel was terminated. The consumer [{}] will no longer receive messages", (Object)clientTag);
                                }
                            }
                        }
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                        block31: {
                            RabbitConsumerState state = new RabbitConsumerState(envelope, properties, body, channel);
                            BoundExecutable boundExecutable = null;
                            try {
                                boundExecutable = binder.bind((Executable)method, (ArgumentBinderRegistry)RabbitMQConsumerAdvice.this.binderRegistry, (Object)state);
                            }
                            catch (Throwable e) {
                                RabbitMQConsumerAdvice.this.handleException(new RabbitListenerException("An error occurred binding the message to the method", e, bean, state));
                            }
                            try {
                                if (boundExecutable != null) {
                                    try (RabbitMessageCloseable closeable = new RabbitMessageCloseable(state, false, reQueue).withAcknowledge(hasAckArg ? null : Boolean.valueOf(false));){
                                        Object returnedValue = boundExecutable.invoke(bean);
                                        String replyTo = properties.getReplyTo();
                                        if (!isVoid && StringUtils.isNotEmpty((CharSequence)replyTo)) {
                                            MutableBasicProperties replyProps = new MutableBasicProperties();
                                            replyProps.setCorrelationId(properties.getCorrelationId());
                                            byte[] converted = null;
                                            if (returnedValue != null) {
                                                RabbitMessageSerDes serDes = RabbitMQConsumerAdvice.this.serDesRegistry.findSerdes(method.getReturnType().asArgument()).map(RabbitMessageSerDes.class::cast).orElseThrow(() -> new RabbitListenerException(String.format("Could not find a serializer for the body argument of type [%s]", returnedValue.getClass().getName()), bean, state));
                                                converted = serDes.serialize(returnedValue, replyProps);
                                            }
                                            channel.basicPublish("", replyTo, replyProps.toBasicProperties(), converted);
                                        }
                                        if (!hasAckArg) {
                                            closeable.withAcknowledge(true);
                                        }
                                        break block31;
                                    }
                                    catch (MessageAcknowledgementException e) {
                                        throw e;
                                    }
                                    catch (Throwable e) {
                                        if (e instanceof RabbitListenerException) {
                                            RabbitMQConsumerAdvice.this.handleException((RabbitListenerException)((Object)e));
                                        } else {
                                            RabbitMQConsumerAdvice.this.handleException(new RabbitListenerException("An error occurred executing the listener", e, bean, state));
                                        }
                                        break block31;
                                    }
                                }
                                new RabbitMessageCloseable(state, false, reQueue).withAcknowledge(false).close();
                            }
                            catch (MessageAcknowledgementException e) {
                                if (!channel.isOpen()) {
                                    ConsumerState consumerState = (ConsumerState)RabbitMQConsumerAdvice.this.consumerChannels.remove(channel);
                                    if (consumerState != null) {
                                        consumerState.channelPool.returnChannel(channel);
                                    }
                                    if (LOG.isErrorEnabled()) {
                                        LOG.error("The channel was closed due to an exception. The consumer [{}] will no longer receive messages", (Object)clientTag);
                                    }
                                }
                                RabbitMQConsumerAdvice.this.handleException(new RabbitListenerException(e.getMessage(), e, bean, null));
                            }
                            finally {
                                ((ConsumerState)RabbitMQConsumerAdvice.this.consumerChannels.get(channel)).inProgress = false;
                            }
                        }
                    }

                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        ((ConsumerState)RabbitMQConsumerAdvice.this.consumerChannels.get(channel)).inProgress = true;
                        if (executorService != null) {
                            executorService.submit(() -> this.doHandleDelivery(consumerTag, envelope, properties, body));
                        } else {
                            this.doHandleDelivery(consumerTag, envelope, properties, body);
                        }
                    }
                });
            }
            catch (Throwable e) {
                if (!channel.isOpen()) {
                    channelPool.returnChannel(channel);
                    this.consumerChannels.remove(channel);
                    if (LOG.isErrorEnabled()) {
                        LOG.error("The channel was closed due to an exception. The consumer [{}] will no longer receive messages", (Object)clientTag);
                    }
                }
                this.handleException(new RabbitListenerException("An error occurred subscribing to a queue", e, bean, null));
            }
        }
    }

    @Override
    @PreDestroy
    public void close() throws Exception {
        while (!this.consumerChannels.entrySet().isEmpty()) {
            Iterator<Map.Entry<Channel, ConsumerState>> it = this.consumerChannels.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Channel, ConsumerState> entry = it.next();
                Channel channel = entry.getKey();
                ConsumerState state = entry.getValue();
                try {
                    channel.basicCancel(state.consumerTag);
                }
                catch (AlreadyClosedException | IOException throwable) {
                    // empty catch block
                }
                if (state.inProgress) continue;
                state.channelPool.returnChannel(channel);
                it.remove();
            }
        }
    }

    protected Channel getChannel(ChannelPool channelPool) {
        try {
            return channelPool.getChannel();
        }
        catch (IOException e) {
            throw new MessageListenerException("Could not retrieve a channel", (Throwable)e);
        }
    }

    private void handleException(RabbitListenerException exception) {
        Object bean = exception.getListener();
        if (bean instanceof RabbitListenerExceptionHandler) {
            ((RabbitListenerExceptionHandler)bean).handle((Throwable)((Object)exception));
        } else {
            this.exceptionHandler.handle((Throwable)((Object)exception));
        }
    }

    private static class ConsumerState {
        private String consumerTag;
        private ChannelPool channelPool;
        private volatile boolean inProgress;

        private ConsumerState() {
        }
    }
}

