/*
 * Decompiled with CFR 0.152.
 */
package org.mule.jms.commons.internal.source;

import java.util.ArrayList;
import java.util.List;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.mule.jms.commons.api.connection.JmsSpecification;
import org.mule.jms.commons.api.destination.ConsumerType;
import org.mule.jms.commons.api.destination.TopicConsumer;
import org.mule.jms.commons.internal.common.JmsCommons;
import org.mule.jms.commons.internal.config.InternalAckMode;
import org.mule.jms.commons.internal.config.JmsAckMode;
import org.mule.jms.commons.internal.config.JmsConfig;
import org.mule.jms.commons.internal.connection.JmsTransactionalConnection;
import org.mule.jms.commons.internal.connection.session.JmsSession;
import org.mule.jms.commons.internal.connection.session.JmsSessionManager;
import org.mule.jms.commons.internal.consume.JmsMessageConsumer;
import org.mule.jms.commons.internal.source.DefaultJmsListenerLock;
import org.mule.jms.commons.internal.source.JmsListenerLock;
import org.mule.jms.commons.internal.source.JmsMessageListenerFactory;
import org.mule.jms.commons.internal.source.JmsResponseMessageBuilder;
import org.mule.jms.commons.internal.source.NullJmsListenerLock;
import org.mule.jms.commons.internal.support.Jms102bSupport;
import org.mule.jms.commons.internal.support.JmsSupport;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.api.util.StringMessageUtils;
import org.mule.runtime.extension.api.annotation.dsl.xml.ParameterDsl;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.tx.SourceTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsListener.class);
    static final String REPLY_TO_DESTINATION_VAR = "REPLY_TO_DESTINATION";
    static final String JMS_LOCK_VAR = "JMS_LOCK";
    static final String JMS_SESSION_VAR = "JMS_SESSION";
    private JmsSessionManager sessionManager;
    private JmsConfig config;
    private ConnectionProvider<JmsTransactionalConnection> connectionProvider;
    private JmsTransactionalConnection connection;
    private JmsSupport jmsSupport;
    private InternalAckMode resolvedAckMode;
    private final List<MessageListenerInfo> createdListeners = new ArrayList<MessageListenerInfo>();
    @Parameter
    @ParameterDsl(allowReferences=false)
    private String destination;
    @Parameter
    @ConfigOverride
    private ConsumerType consumerType;
    @Parameter
    @Optional
    private JmsAckMode ackMode;
    @Parameter
    @ConfigOverride
    private String selector;
    @Parameter
    @Optional
    @Example(value="application/json")
    private String inboundContentType;
    @Parameter
    @Optional
    @Example(value="UTF-8")
    private String inboundEncoding;
    @Parameter
    @Optional(defaultValue="4")
    private int numberOfConsumers;
    private SourceTransactionalAction transactionalAction;

    static void notifyIfConnectionProblem(SourceCallbackContext callbackContext, Exception e) {
        JmsListener.notifyIfConnectionProblem(callbackContext.getSourceCallback(), e);
    }

    static void notifyIfConnectionProblem(SourceCallback callback, Exception e) {
        ExceptionUtils.extractConnectionException((Throwable)e).ifPresent(ce -> callback.onConnectionException(ce));
    }

    public JmsListener(JmsSessionManager sessionManager, JmsConfig config, ConnectionProvider<JmsTransactionalConnection> connectionProvider, String destination, ConsumerType consumerType, JmsAckMode ackMode, String selector, String inboundContentType, String inboundEncoding, int numberOfConsumers, SourceTransactionalAction transactionalAction) {
        this.sessionManager = sessionManager;
        this.config = config;
        this.connectionProvider = connectionProvider;
        this.destination = destination;
        this.consumerType = consumerType;
        this.ackMode = ackMode;
        this.selector = selector;
        this.inboundContentType = inboundContentType;
        this.inboundEncoding = inboundEncoding;
        this.numberOfConsumers = numberOfConsumers;
        this.transactionalAction = transactionalAction;
    }

    public void onStart(SourceCallback sourceCallback) throws MuleException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting JMS Message Listener");
        }
        Object consumerConfig = this.config.getConsumerConfig();
        this.resolvedAckMode = this.transactionalAction.equals((Object)SourceTransactionalAction.ALWAYS_BEGIN) ? InternalAckMode.TRANSACTED : JmsCommons.resolveOverride(JmsCommons.toInternalAckMode(consumerConfig.getAckMode()), JmsCommons.toInternalAckMode(this.ackMode));
        this.connection = (JmsTransactionalConnection)this.connectionProvider.connect();
        this.jmsSupport = this.connection.getJmsSupport();
        this.connection.registerExceptionListener(e -> sourceCallback.onConnectionException(new ConnectionException((Throwable)e, (Object)this.connection)));
        JmsMessageListenerFactory messageListenerFactory = new JmsMessageListenerFactory(this.resolvedAckMode, this.inboundEncoding, this.inboundContentType, this.config, this.sessionManager, this.jmsSupport, sourceCallback, this.connectionProvider);
        this.validateNumberOfConsumers(this.numberOfConsumers);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Starting JMS Listener with [%s] consumers on destination [%s] of type [%s] with AckMode [%s]", this.numberOfConsumers, this.destination, JmsCommons.getDestinationType(this.consumerType), this.resolvedAckMode.name()));
        }
        try {
            for (int i = 0; i < this.numberOfConsumers; ++i) {
                JmsSession session = this.connection.createSession(this.resolvedAckMode, this.consumerType.topic());
                Destination jmsDestination = this.jmsSupport.createDestination(session.get(), this.destination, this.consumerType.topic(), this.config);
                JmsMessageConsumer consumer = this.connection.createConsumer(session, jmsDestination, this.selector, this.consumerType);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Creating Message Listener on Session [%s] for destination [%s]", session.get(), this.destination));
                }
                JmsListenerLock jmsLock = this.createJmsLock();
                this.createdListeners.add(new MessageListenerInfo(session, jmsLock, consumer));
                consumer.listen(messageListenerFactory.createMessageListener(session, jmsLock));
            }
        }
        catch (Exception e2) {
            String msg = String.format("An error occurred while creating the consumers for destination [%s:%s]: %s", JmsCommons.getDestinationType(this.consumerType), this.destination, e2.getMessage());
            LOGGER.error(msg, (Throwable)e2);
            this.releaseListeners();
            throw new ConnectionException(msg, (Throwable)e2, null, (Object)this.connection);
        }
    }

    public void onStop() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Stopping JMS Listener on destination [%s:%s]", JmsCommons.getDestinationType(this.consumerType), this.destination));
        }
        this.releaseListeners();
        if (this.connection != null) {
            this.connectionProvider.disconnect((Object)this.connection);
        }
    }

    public void onSuccess(JmsResponseMessageBuilder messageBuilder, CorrelationInfo correlationInfo, SourceCallbackContext callbackContext) {
        callbackContext.getVariable(JMS_LOCK_VAR).ifPresent(JmsListenerLock::unlock);
        callbackContext.getVariable(REPLY_TO_DESTINATION_VAR).ifPresent(replyTo -> callbackContext.getVariable(JMS_SESSION_VAR).ifPresent(session -> this.doReply(messageBuilder, callbackContext, (Destination)replyTo, correlationInfo, (JmsSession)session)));
    }

    public void onError(Error error, SourceCallbackContext callbackContext) {
        callbackContext.getVariable(JMS_LOCK_VAR).ifPresent(jmsLock -> {
            if (this.resolvedAckMode.equals((Object)InternalAckMode.AUTO) || this.resolvedAckMode.equals((Object)InternalAckMode.DUPS_OK)) {
                jmsLock.unlockWithFailure(error);
            } else {
                jmsLock.unlock();
            }
        });
    }

    private void doReply(JmsResponseMessageBuilder messageBuilder, SourceCallbackContext callbackContext, Destination replyTo, CorrelationInfo correlationInfo, JmsSession session) {
        String destinationName;
        boolean replyToTopic = this.replyDestinationIsTopic(replyTo);
        try {
            destinationName = replyToTopic ? ((Topic)replyTo).getTopicName() : ((Queue)replyTo).getQueueName();
        }
        catch (JMSException e) {
            LOGGER.error(String.format("An error occurred during reply. Failed to obtain the destination name: %s", e.getMessage()));
            JmsListener.notifyIfConnectionProblem(callbackContext, (Exception)((Object)e));
            return;
        }
        try {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Begin reply to destination [%s] of type [%s]", destinationName, replyToTopic ? "TOPIC" : "QUEUE"));
            }
            Message message = messageBuilder.build(this.connection.getJmsSupport(), OutboundCorrelationStrategy.AUTO, correlationInfo, session.get(), this.config);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Message built, sending message to " + destinationName);
            }
            JmsSession replySession = this.connection.createSession(InternalAckMode.AUTO, replyToTopic);
            this.connection.createProducer(replySession, replyTo, replyToTopic).publish(message, messageBuilder);
        }
        catch (Exception e) {
            LOGGER.error(String.format("An error occurred during reply to destination [%s] of type [%s]: %s", destinationName, replyToTopic ? "TOPIC" : "QUEUE", e.getMessage()), (Throwable)e);
            JmsListener.notifyIfConnectionProblem(callbackContext, e);
        }
    }

    private boolean replyDestinationIsTopic(Destination destination) {
        if (destination instanceof Topic && destination instanceof Queue && this.jmsSupport instanceof Jms102bSupport) {
            LOGGER.error(StringMessageUtils.getBoilerPlate((String)"Destination implements both Queue and Topic while complying with JMS 1.0.2b specification. Please report your application server or JMS vendor name and version to http://www.mulesoft.org/jira"));
        }
        return destination instanceof Topic;
    }

    private JmsListenerLock createJmsLock() {
        if (this.resolvedAckMode.equals((Object)InternalAckMode.IMMEDIATE) || this.resolvedAckMode.equals((Object)InternalAckMode.TRANSACTED)) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace(String.format("Session lock skipped for ACK mode [%s].", this.resolvedAckMode.name()));
            }
            return new NullJmsListenerLock();
        }
        return new DefaultJmsListenerLock();
    }

    private void validateNumberOfConsumers(int numberOfConsumers) {
        TopicConsumer topicConsumer;
        if (numberOfConsumers < 1) {
            throw new IllegalArgumentException("Invalid number of consumers: [" + numberOfConsumers + "]. The number should be 1 or greater.");
        }
        if (numberOfConsumers > 1 && this.consumerType.topic() && !this.isCapableOfMultiConsumersOnTopic(topicConsumer = (TopicConsumer)this.consumerType)) {
            throw new IllegalArgumentException("Destination [" + this.destination + "] is a topic, but [" + numberOfConsumers + "] receivers have been requested. This is only possible for 'shared' topic consumers, otherwise use 1.");
        }
    }

    private boolean isCapableOfMultiConsumersOnTopic(TopicConsumer topicConsumer) {
        return this.jmsSupport.getSpecification().equals((Object)JmsSpecification.JMS_2_0) && topicConsumer.isShared();
    }

    private void releaseListeners() {
        try {
            this.createdListeners.forEach(info -> {
                info.getLock().unlockWithFailure();
                this.closeConsumer(info.getConsumer());
                JmsCommons.closeQuietly(info.getSession());
            });
        }
        finally {
            this.createdListeners.clear();
        }
    }

    private void closeConsumer(JmsMessageConsumer consumer) {
        try {
            consumer.listen(null);
        }
        catch (JMSException e) {
            LOGGER.error(String.format("An unexpected error occurred trying to turn off a MessageListener [%s].", consumer), (Throwable)e);
        }
        JmsCommons.closeQuietly(consumer);
    }

    public static class MessageListenerInfo {
        private JmsSession session;
        private JmsListenerLock jmsListenerLock;
        private JmsMessageConsumer messageConsumer;

        MessageListenerInfo(JmsSession session, JmsListenerLock jmsListenerLock, JmsMessageConsumer messageConsumer) {
            this.session = session;
            this.jmsListenerLock = jmsListenerLock;
            this.messageConsumer = messageConsumer;
        }

        public JmsSession getSession() {
            return this.session;
        }

        public JmsListenerLock getLock() {
            return this.jmsListenerLock;
        }

        public JmsMessageConsumer getConsumer() {
            return this.messageConsumer;
        }
    }
}

