/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.rabbitmq.intercept;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.RecoverableChannel;
import com.rabbitmq.client.ShutdownSignalException;
import io.micronaut.context.BeanContext;
import io.micronaut.context.Qualifier;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.bind.ArgumentBinderRegistry;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.Executable;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.messaging.exceptions.MessageAcknowledgementException;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitConnection;
import io.micronaut.rabbitmq.annotation.RabbitProperty;
import io.micronaut.rabbitmq.bind.AcknowledgmentAction;
import io.micronaut.rabbitmq.bind.RabbitBinderRegistry;
import io.micronaut.rabbitmq.bind.RabbitConsumerState;
import io.micronaut.rabbitmq.bind.RabbitMessageCloseable;
import io.micronaut.rabbitmq.connect.ChannelPool;
import io.micronaut.rabbitmq.connect.recovery.TemporarilyDownConnection;
import io.micronaut.rabbitmq.connect.recovery.TemporarilyDownException;
import io.micronaut.rabbitmq.event.RabbitConsumerStarted;
import io.micronaut.rabbitmq.event.RabbitConsumerStarting;
import io.micronaut.rabbitmq.exception.RabbitListenerException;
import io.micronaut.rabbitmq.exception.RabbitListenerExceptionHandler;
import io.micronaut.rabbitmq.intercept.MutableBasicProperties;
import io.micronaut.rabbitmq.serdes.RabbitMessageSerDes;
import io.micronaut.rabbitmq.serdes.RabbitMessageSerDesRegistry;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class RabbitMQConsumerAdvice
implements ExecutableMethodProcessor<Queue>,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConsumerAdvice.class);
    private final BeanContext beanContext;
    private final ApplicationEventPublisher<RabbitConsumerStarting> startingEventPublisher;
    private final ApplicationEventPublisher<RabbitConsumerStarted> startedEventPublisher;
    private final RabbitBinderRegistry binderRegistry;
    private final RabbitListenerExceptionHandler exceptionHandler;
    private final RabbitMessageSerDesRegistry serDesRegistry;
    private final ConversionService conversionService;
    private final Map<String, ChannelPool> channelPools;
    private final List<RecoverableConsumerWrapper> consumers = new CopyOnWriteArrayList<RecoverableConsumerWrapper>();

    @Inject
    public RabbitMQConsumerAdvice(BeanContext beanContext, ApplicationEventPublisher<RabbitConsumerStarting> startingPublisher, ApplicationEventPublisher<RabbitConsumerStarted> startedPublisher, RabbitBinderRegistry binderRegistry, RabbitListenerExceptionHandler exceptionHandler, RabbitMessageSerDesRegistry serDesRegistry, ConversionService conversionService, List<ChannelPool> channelPools) {
        this.beanContext = beanContext;
        this.startingEventPublisher = startingPublisher;
        this.startedEventPublisher = startedPublisher;
        this.binderRegistry = binderRegistry;
        this.exceptionHandler = exceptionHandler;
        this.serDesRegistry = serDesRegistry;
        this.conversionService = conversionService;
        this.channelPools = new HashMap<String, ChannelPool>(channelPools.size());
        for (ChannelPool cp : channelPools) {
            this.channelPools.put(cp.getName(), cp);
        }
    }

    @Deprecated(since="4.1.0", forRemoval=true)
    public RabbitMQConsumerAdvice(BeanContext beanContext, RabbitBinderRegistry binderRegistry, RabbitListenerExceptionHandler exceptionHandler, RabbitMessageSerDesRegistry serDesRegistry, ConversionService conversionService, List<ChannelPool> channelPools) {
        this(beanContext, (ApplicationEventPublisher<RabbitConsumerStarting>)((ApplicationEventPublisher)beanContext.getBean(Argument.of(ApplicationEventPublisher.class, (Class[])new Class[]{RabbitConsumerStarting.class}))), (ApplicationEventPublisher<RabbitConsumerStarted>)((ApplicationEventPublisher)beanContext.getBean(Argument.of(ApplicationEventPublisher.class, (Class[])new Class[]{RabbitConsumerStarted.class}))), binderRegistry, exceptionHandler, serDesRegistry, conversionService, channelPools);
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        block3: {
            AnnotationValue queueAnn = method.getAnnotation(Queue.class);
            if (queueAnn != null) {
                String queue = (String)queueAnn.getRequiredValue(String.class);
                String methodTag = method.getDeclaringType().getSimpleName() + "#" + String.valueOf(method);
                boolean reQueue = (Boolean)queueAnn.getRequiredValue("reQueue", Boolean.TYPE);
                boolean exclusive = (Boolean)queueAnn.getRequiredValue("exclusive", Boolean.TYPE);
                boolean hasAcknowledgementArg = Arrays.stream(method.getArguments()).anyMatch(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()));
                boolean autoAcknowledgment = !hasAcknowledgementArg && (Boolean)queueAnn.getRequiredValue("autoAcknowledgment", Boolean.TYPE) != false;
                boolean skipAckActions = autoAcknowledgment || hasAcknowledgementArg;
                OptionalInt optionalInt = queueAnn.intValue("prefetch");
                Integer prefetch = optionalInt.isPresent() ? Integer.valueOf(optionalInt.getAsInt()) : null;
                int numberOfConsumers = queueAnn.intValue("numberOfConsumers").orElse(1);
                ChannelPool channelPool = this.getChannelPool(method);
                ExecutorService executorService = this.getExecutorService(method);
                Map<String, Object> arguments = this.retrieveArguments(method);
                Object bean = this.getExecutableMethodBean(beanDefinition, method);
                boolean isVoid = method.getReturnType().isVoid();
                DefaultExecutableBinder binder = new DefaultExecutableBinder();
                DeliverCallback deliverCallback = (channel, message) -> {
                    block18: {
                        RabbitConsumerState state = new RabbitConsumerState(message.getEnvelope(), message.getProperties(), message.getBody(), channel);
                        BoundExecutable boundExecutable = null;
                        try {
                            boundExecutable = binder.bind((Executable)method, (ArgumentBinderRegistry)this.binderRegistry, (Object)state);
                        }
                        catch (Throwable e) {
                            this.handleException(new RabbitListenerException("An error occurred binding the message to the method", e, bean, state));
                        }
                        try {
                            if (boundExecutable != null) {
                                try (RabbitMessageCloseable closeable = new RabbitMessageCloseable(state, false, reQueue).withAcknowledgmentAction(skipAckActions ? AcknowledgmentAction.NONE : AcknowledgmentAction.NACK);){
                                    Object returnedValue = boundExecutable.invoke(bean);
                                    String replyTo = message.getProperties().getReplyTo();
                                    if (!isVoid && StringUtils.isNotEmpty((CharSequence)replyTo)) {
                                        MutableBasicProperties replyProps = new MutableBasicProperties();
                                        replyProps.setCorrelationId(message.getProperties().getCorrelationId());
                                        byte[] converted = null;
                                        if (returnedValue != null) {
                                            RabbitMessageSerDes serDes = this.serDesRegistry.findSerdes(method.getReturnType().asArgument()).map(RabbitMessageSerDes.class::cast).orElseThrow(() -> new RabbitListenerException(String.format("Could not find a serializer for the body argument of type [%s]", returnedValue.getClass().getName()), bean, state));
                                            converted = serDes.serialize(returnedValue, replyProps);
                                        }
                                        channel.basicPublish("", replyTo, replyProps.toBasicProperties(), converted);
                                    }
                                    if (!skipAckActions) {
                                        closeable.withAcknowledgmentAction(AcknowledgmentAction.ACK);
                                    }
                                    break block18;
                                }
                                catch (MessageAcknowledgementException e) {
                                    throw e;
                                }
                                catch (RabbitListenerException e) {
                                    this.handleException(e);
                                    break block18;
                                }
                                catch (Throwable e) {
                                    this.handleException(new RabbitListenerException("An error occurred executing the listener", e, bean, state));
                                    break block18;
                                }
                            }
                            new RabbitMessageCloseable(state, false, reQueue).withAcknowledgmentAction(AcknowledgmentAction.NACK).close();
                        }
                        catch (MessageAcknowledgementException e) {
                            this.handleException(new RabbitListenerException(e.getMessage(), e, bean, state));
                        }
                    }
                };
                TemporarilyDownConnection.EventuallyUpListener registerConsumersAndPublishEvent = c -> {
                    for (int idx = 0; idx < numberOfConsumers; ++idx) {
                        String consumerTag = methodTag + "[" + idx + "]";
                        LOG.debug("Registering a consumer to queue [{}] with client tag [{}]", (Object)queue, (Object)consumerTag);
                        this.consumers.add(new RecoverableConsumerWrapper(queue, consumerTag, executorService, exclusive, arguments, channelPool, prefetch, deliverCallback, autoAcknowledgment));
                    }
                    this.startedEventPublisher.publishEvent((Object)new RabbitConsumerStarted(bean, method.getMethodName(), queue));
                };
                try {
                    this.startingEventPublisher.publishEvent((Object)new RabbitConsumerStarting(bean, method.getMethodName(), queue));
                    registerConsumersAndPublishEvent.onConnectionInitialized(null);
                }
                catch (Throwable e) {
                    this.handleException(new RabbitListenerException("An error occurred subscribing to a queue", e, bean, null));
                    if (!(e instanceof TemporarilyDownException)) break block3;
                    TemporarilyDownException temp = (TemporarilyDownException)((Object)e);
                    temp.getConnection().addEventuallyUpListener(registerConsumersAndPublishEvent);
                }
            }
        }
    }

    private ChannelPool getChannelPool(ExecutableMethod<?, ?> method) {
        String connection = method.stringValue(RabbitConnection.class, "connection").orElse("default");
        return Optional.ofNullable(this.channelPools.get(connection)).orElseThrow(() -> new MessageListenerException(String.format("Failed to find a channel pool named [%s] to register a listener", connection)));
    }

    private static void setChannelPrefetch(Integer prefetch, Channel channel) {
        try {
            if (prefetch != null) {
                channel.basicQos(prefetch.intValue());
            }
        }
        catch (IOException e) {
            throw new MessageListenerException(String.format("Failed to set a prefetch count of [%s] on the channel", prefetch), (Throwable)e);
        }
    }

    private Map<String, Object> retrieveArguments(ExecutableMethod<?, ?> method) {
        HashMap<String, Object> arguments = new HashMap<String, Object>();
        List propertyAnnotations = method.getAnnotationValuesByType(RabbitProperty.class);
        Collections.reverse(propertyAnnotations);
        propertyAnnotations.forEach(prop -> {
            String name = (String)prop.getRequiredValue("name", String.class);
            String value = prop.stringValue().orElse(null);
            Class type = prop.get((CharSequence)"type", Class.class).orElse(null);
            if (!StringUtils.isNotEmpty((CharSequence)name) || !StringUtils.isNotEmpty((CharSequence)value)) return;
            if (type != null && type != Void.class) {
                Optional converted = this.conversionService.convert((Object)value, type);
                if (!converted.isPresent()) throw new MessageListenerException(String.format("Could not convert the argument [%s] to the required type [%s]", name, type));
                arguments.put(name, converted.get());
                return;
            } else {
                arguments.put(name, value);
            }
        });
        return arguments;
    }

    private Object getExecutableMethodBean(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
        Qualifier qualifier = beanDefinition.getAnnotationNameByStereotype("javax.inject.Qualifier").map(type -> Qualifiers.byAnnotation((AnnotationMetadata)beanDefinition, (String)type)).orElse(null);
        Class beanType = beanDefinition.getBeanType();
        Object bean = this.beanContext.findBean(beanType, qualifier).orElseThrow(() -> new MessageListenerException("Could not find the bean to execute the method " + String.valueOf(method)));
        return bean;
    }

    private ExecutorService getExecutorService(ExecutableMethod<?, ?> method) {
        String executor = method.stringValue(RabbitConnection.class, "executor").orElse(null);
        if (executor != null) {
            return (ExecutorService)this.beanContext.findBean(ExecutorService.class, Qualifiers.byName((String)executor)).orElseThrow(() -> new MessageListenerException(String.format("Could not find the executor service [%s] specified for the method [%s]", executor, method)));
        }
        return null;
    }

    @Override
    @PreDestroy
    public void close() throws Exception {
        this.consumers.forEach(RecoverableConsumerWrapper::cancel);
    }

    private void handleException(RabbitListenerException exception) {
        Object bean = exception.getListener();
        if (bean instanceof RabbitListenerExceptionHandler) {
            ((RabbitListenerExceptionHandler)bean).handle((Throwable)((Object)exception));
        } else {
            this.exceptionHandler.handle((Throwable)((Object)exception));
        }
    }

    @FunctionalInterface
    private static interface DeliverCallback {
        public void handle(Channel var1, Delivery var2);
    }

    private class RecoverableConsumerWrapper {
        final String consumerTag;
        private final ExecutorService executorService;
        private final String queue;
        private final boolean exclusive;
        private final Map<String, Object> arguments;
        private final ChannelPool channelPool;
        private final Integer prefetch;
        private final DeliverCallback deliverCallback;
        private final AtomicInteger handlingDeliveryCount = new AtomicInteger();
        private final boolean autoAcknowledgment;
        private DefaultConsumer consumer;
        private boolean canceled = false;

        RecoverableConsumerWrapper(String queue, String consumerTag, ExecutorService executorService, boolean exclusive, Map<String, Object> arguments, ChannelPool channelPool, Integer prefetch, DeliverCallback deliverCallback, boolean autoAcknowledgment) throws IOException {
            this.queue = queue;
            this.consumerTag = consumerTag;
            this.executorService = executorService;
            this.exclusive = exclusive;
            this.arguments = arguments;
            this.channelPool = channelPool;
            this.prefetch = prefetch;
            this.deliverCallback = deliverCallback;
            this.autoAcknowledgment = autoAcknowledgment;
            Channel channel = null;
            try {
                channel = channelPool.getChannel();
                this.consumer = this.createConsumer(channel);
            }
            catch (IOException e) {
                if (channel != null) {
                    channelPool.returnChannel(channel);
                }
                throw e;
            }
        }

        public synchronized void cancel() {
            this.canceled = true;
            if (this.consumer == null) {
                return;
            }
            Channel channel = this.consumer.getChannel();
            try {
                channel.basicCancel(this.consumerTag);
            }
            catch (AlreadyClosedException | IOException throwable) {
                // empty catch block
            }
            try {
                while (this.handlingDeliveryCount.get() > 0) {
                    this.wait(500L);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            finally {
                this.consumer = null;
                RabbitMQConsumerAdvice.this.consumers.remove(this);
                this.channelPool.returnChannel(channel);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void performConsumerRecovery() {
            DefaultConsumer recoveredConsumer = null;
            int recoveryAttempts = 0;
            while (recoveredConsumer == null) {
                Channel channel = null;
                try {
                    RecoverableConsumerWrapper recoverableConsumerWrapper = this;
                    synchronized (recoverableConsumerWrapper) {
                        if (this.canceled) {
                            return;
                        }
                        LOG.debug("consumer [{}] recovery attempt: {}", (Object)this.consumerTag, (Object)(recoveryAttempts + 1));
                        channel = this.channelPool.getChannelWithRecoveringDelay(recoveryAttempts++);
                        this.consumer = recoveredConsumer = this.createConsumer(channel);
                    }
                }
                catch (IOException e) {
                    if (channel != null) {
                        this.channelPool.returnChannel(channel);
                    }
                    LOG.warn("Recovery attempt {} for consumer [{}] failed, will retry.", new Object[]{recoveryAttempts, this.consumerTag, e});
                }
                catch (InterruptedException e) {
                    LOG.warn("The consumer [{}] recovery was interrupted. The consumer will not recover.", (Object)this.consumerTag, (Object)e);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            LOG.info("consumer [{}] recovered", (Object)this.consumerTag);
        }

        private DefaultConsumer createConsumer(Channel channel) throws IOException {
            RabbitMQConsumerAdvice.setChannelPrefetch(this.prefetch, channel);
            DefaultConsumer consumer = new DefaultConsumer(channel){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void handleCancel(String consumerTag) throws IOException {
                    RecoverableConsumerWrapper recoverableConsumerWrapper = RecoverableConsumerWrapper.this;
                    synchronized (recoverableConsumerWrapper) {
                        RecoverableConsumerWrapper.this.consumer = null;
                        RecoverableConsumerWrapper.this.channelPool.returnChannel(this.getChannel());
                    }
                    if (RecoverableConsumerWrapper.this.channelPool.isTopologyRecoveryEnabled() && this.getChannel() instanceof RecoverableChannel) {
                        LOG.warn("The consumer [{}] subscription was canceled, a recovery will be tried.", (Object)consumerTag);
                        RecoverableConsumerWrapper.this.performConsumerRecovery();
                    } else {
                        LOG.warn("The RabbitMQ consumer [{}] was canceled. Recovery is not enabled. It will no longer receive messages", (Object)consumerTag);
                        RecoverableConsumerWrapper.this.cancel();
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                    if (this.getChannel() instanceof RecoverableChannel && sig.isHardError()) {
                        LOG.info("The underlying connection was terminated. Automatic recovery attempt is underway for consumer [{}]", (Object)consumerTag);
                    } else if (RecoverableConsumerWrapper.this.channelPool.isTopologyRecoveryEnabled() && this.getChannel() instanceof RecoverableChannel) {
                        LOG.info("The channel of this consumer was terminated. Automatic recovery attempt is underway for consumer [{}]", (Object)consumerTag, (Object)sig);
                        RecoverableConsumerWrapper recoverableConsumerWrapper = RecoverableConsumerWrapper.this;
                        synchronized (recoverableConsumerWrapper) {
                            RecoverableConsumerWrapper.this.consumer = null;
                            RecoverableConsumerWrapper.this.channelPool.returnChannel(this.getChannel());
                        }
                        RecoverableConsumerWrapper.this.performConsumerRecovery();
                    } else {
                        LOG.error("The channel was closed. Recovery is not enabled. The consumer [{}] will no longer receive messages", (Object)consumerTag, (Object)sig);
                        RecoverableConsumerWrapper.this.cancel();
                    }
                }

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (RecoverableConsumerWrapper.this.canceled || !this.getChannel().isOpen()) {
                        return;
                    }
                    RecoverableConsumerWrapper.this.handlingDeliveryCount.incrementAndGet();
                    if (RecoverableConsumerWrapper.this.executorService != null) {
                        RecoverableConsumerWrapper.this.executorService.submit(() -> this.callbackHandle(envelope, properties, body));
                    } else {
                        this.callbackHandle(envelope, properties, body);
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                private void callbackHandle(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    try {
                        RecoverableConsumerWrapper.this.deliverCallback.handle(this.getChannel(), new Delivery(envelope, properties, body));
                    }
                    finally {
                        RecoverableConsumerWrapper.this.handlingDeliveryCount.decrementAndGet();
                    }
                }
            };
            channel.basicConsume(this.queue, this.autoAcknowledgment, this.consumerTag, false, this.exclusive, this.arguments, (Consumer)consumer);
            return consumer;
        }
    }
}

