/*
 * Decompiled with CFR 0.152.
 */
package org.mule.jms.commons.internal.source;

import java.util.Optional;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.mule.jms.commons.api.connection.JmsSpecification;
import org.mule.jms.commons.api.destination.ConsumerType;
import org.mule.jms.commons.api.destination.TopicConsumer;
import org.mule.jms.commons.internal.common.JmsCommons;
import org.mule.jms.commons.internal.config.InternalAckMode;
import org.mule.jms.commons.internal.config.JmsAckMode;
import org.mule.jms.commons.internal.config.JmsConfig;
import org.mule.jms.commons.internal.connection.JmsTransactionalConnection;
import org.mule.jms.commons.internal.connection.JmsXaTransactionalConnection;
import org.mule.jms.commons.internal.connection.session.JmsSession;
import org.mule.jms.commons.internal.connection.session.JmsSessionManager;
import org.mule.jms.commons.internal.publish.JmsMessageProducer;
import org.mule.jms.commons.internal.source.JmsResponseMessageBuilder;
import org.mule.jms.commons.internal.source.MessageConsumerDelegate;
import org.mule.jms.commons.internal.source.SourceConfiguration;
import org.mule.jms.commons.internal.source.polling.JmsXaPollingMessageConsumerDelegate;
import org.mule.jms.commons.internal.source.push.JmsMessageListenerDelegate;
import org.mule.jms.commons.internal.source.push.JmsMessageListenerFactory;
import org.mule.jms.commons.internal.support.Jms102bSupport;
import org.mule.jms.commons.internal.support.JmsSupport;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.api.util.StringMessageUtils;
import org.mule.runtime.extension.api.connectivity.XATransactionalConnection;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.tx.SourceTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsListener.class);
    static final String REPLY_TO_DESTINATION_VAR = "REPLY_TO_DESTINATION";
    private static final String THREAD_NAME = "JMS-CLIENT-LISTENER";
    private Scheduler scheduler;
    private JmsSessionManager sessionManager;
    private JmsConfig config;
    private ConnectionProvider<JmsTransactionalConnection> connectionProvider;
    private JmsTransactionalConnection connection;
    private JmsSupport jmsSupport;
    private String destination;
    private ConsumerType consumerType;
    private JmsAckMode ackMode;
    private String selector;
    private String inboundContentType;
    private String inboundEncoding;
    private int numberOfConsumers;
    private MessageConsumerDelegate messageConsumerDelegate;
    private SourceConfiguration sourceConfiguration;
    private SchedulerService schedulerService;

    static void notifyIfConnectionProblem(SourceCallbackContext callbackContext, Exception e) {
        JmsListener.notifyIfConnectionProblem(callbackContext.getSourceCallback(), e);
    }

    public static void notifyIfConnectionProblem(SourceCallback callback, Exception e) {
        ExceptionUtils.extractConnectionException((Throwable)e).ifPresent(ce -> callback.onConnectionException(ce));
    }

    public JmsListener(JmsSessionManager sessionManager, JmsConfig config, ConnectionProvider<JmsTransactionalConnection> connectionProvider, String destination, ConsumerType consumerType, JmsAckMode ackMode, String selector, String inboundContentType, String inboundEncoding, int numberOfConsumers, SourceConfiguration sourceConfiguration, SchedulerService schedulerService) {
        this.sessionManager = sessionManager;
        this.config = config;
        this.connectionProvider = connectionProvider;
        this.destination = destination;
        this.consumerType = consumerType;
        this.ackMode = ackMode;
        this.selector = selector;
        this.inboundContentType = inboundContentType;
        this.inboundEncoding = inboundEncoding;
        this.numberOfConsumers = numberOfConsumers;
        this.sourceConfiguration = sourceConfiguration;
        this.schedulerService = schedulerService;
    }

    public void onStart(SourceCallback sourceCallback) throws MuleException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting JMS Message Listener");
        }
        this.scheduler = this.schedulerService.ioScheduler(SchedulerConfig.config().withName(THREAD_NAME));
        Object consumerConfig = this.config.getConsumerConfig();
        InternalAckMode resolvedAckMode = this.sourceConfiguration.getTransactionalAction().equals((Object)SourceTransactionalAction.ALWAYS_BEGIN) ? InternalAckMode.TRANSACTED : JmsCommons.resolveOverride(JmsCommons.toInternalAckMode(consumerConfig.getAckMode()), JmsCommons.toInternalAckMode(this.ackMode));
        this.connection = (JmsTransactionalConnection)this.connectionProvider.connect();
        this.validateTransactionType(this.connection);
        this.jmsSupport = this.connection.getJmsSupport();
        this.connection.registerExceptionListener(e -> sourceCallback.onConnectionException(new ConnectionException((Throwable)e, (Object)this.connection)));
        this.validateNumberOfConsumers(this.numberOfConsumers);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Starting JMS Listener with [%s] consumers on destination [%s] of type [%s] with AckMode [%s]", this.numberOfConsumers, this.destination, JmsCommons.getDestinationType(this.consumerType), resolvedAckMode.name()));
        }
        this.messageConsumerDelegate = this.connection instanceof JmsXaTransactionalConnection ? new JmsXaPollingMessageConsumerDelegate((JmsXaTransactionalConnection)this.connection, this.jmsSupport, this.destination, this.consumerType, this.config, this.selector, this.sessionManager, this.connectionProvider, this.scheduler, this.inboundContentType, this.inboundEncoding, sourceCallback) : new JmsMessageListenerDelegate(new JmsMessageListenerFactory(resolvedAckMode, this.inboundEncoding, this.inboundContentType, this.config, this.sessionManager, this.jmsSupport, sourceCallback, this.connectionProvider), this.connection, this.jmsSupport, this.consumerType, this.destination, this.config, resolvedAckMode, this.selector);
        this.messageConsumerDelegate.createConsumers(this.numberOfConsumers);
    }

    private void validateTransactionType(JmsTransactionalConnection connection) throws ConnectionException {
        switch (this.sourceConfiguration.getTransactionalAction()) {
            case ALWAYS_BEGIN: {
                switch (this.sourceConfiguration.getTransactionType()) {
                    case XA: {
                        if (!(connection instanceof XATransactionalConnection)) {
                            throw new ConnectionException(String.format("Invalid configuration, The message listener on the flow '%s' has been configured to work with XA Transactions, but the given connection from the config '%s' doesn't support it.\nThis can be fixed doing one of the following:\n - To work with Local transactions, select the 'LOCAL' Transaction Type on the Advanced Source Configuration \n - To work with XA Transactions, enable XA in the connection configuration", this.sourceConfiguration.getFlowName(), this.sourceConfiguration.getConfigName()));
                        }
                        return;
                    }
                    case LOCAL: {
                        if (!(connection instanceof XATransactionalConnection)) break;
                        throw new ConnectionException(String.format("Invalid configuration: The message listener on the flow '%s' has been configured to work with Local Transactions, but the given connection from the config '%s' requires XA Transactions. \nThis can be fixed doing one of the following:\n - To work with XA Transactions, select the 'XA' Transaction Type on the Advanced Source Configuration\n - To work with Local transactions, disable XA in the connection configuration", this.sourceConfiguration.getFlowName(), this.sourceConfiguration.getConfigName()));
                    }
                }
                break;
            }
            case NONE: {
                if (!(connection instanceof XATransactionalConnection)) break;
                LOGGER.info("A XA Connection is being used in a non transactional context, this could led to unexpected behaviour");
            }
        }
    }

    public void onStop() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Stopping JMS Listener on destination [%s:%s]", JmsCommons.getDestinationType(this.consumerType), this.destination));
        }
        try {
            if (this.messageConsumerDelegate != null) {
                this.messageConsumerDelegate.stop();
                this.messageConsumerDelegate = null;
            }
            if (this.connection != null) {
                this.connectionProvider.disconnect((Object)this.connection);
            }
        }
        finally {
            if (this.scheduler != null) {
                this.scheduler.stop();
            }
        }
    }

    public void onSuccess(JmsResponseMessageBuilder messageBuilder, CorrelationInfo correlationInfo, SourceCallbackContext callbackContext) {
        if (this.messageConsumerDelegate != null) {
            this.messageConsumerDelegate.onSuccess(callbackContext);
        }
        callbackContext.getVariable(REPLY_TO_DESTINATION_VAR).ifPresent(replyTo -> this.doReply(messageBuilder, callbackContext, (Destination)replyTo, correlationInfo));
    }

    public void onError(Error error, SourceCallbackContext callbackContext) {
        if (this.messageConsumerDelegate != null) {
            this.messageConsumerDelegate.onError(callbackContext, error);
        } else {
            LOGGER.debug("A error occurred after the Source being stopped.", (Object)error);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doReply(JmsResponseMessageBuilder messageBuilder, SourceCallbackContext callbackContext, Destination replyTo, CorrelationInfo correlationInfo) {
        String destinationName;
        boolean replyToTopic = this.replyDestinationIsTopic(replyTo);
        try {
            destinationName = replyToTopic ? ((Topic)replyTo).getTopicName() : ((Queue)replyTo).getQueueName();
        }
        catch (JMSException e) {
            LOGGER.error(String.format("An error occurred during reply. Failed to obtain the destination name: %s", e.getMessage()));
            JmsListener.notifyIfConnectionProblem(callbackContext, (Exception)((Object)e));
            return;
        }
        JmsMessageProducer producer = null;
        JmsSession session = null;
        JmsSession replySession = null;
        try {
            session = this.getSession(this.connection, replyToTopic);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Begin reply to destination [%s] of type [%s]", destinationName, replyToTopic ? "TOPIC" : "QUEUE"));
            }
            Message message = messageBuilder.build(this.connection.getJmsSupport(), messageBuilder.getSendCorrelationId(), correlationInfo, session.get(), this.config);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Message built, sending message to " + destinationName);
            }
            replySession = this.connection.createSession(InternalAckMode.AUTO, replyToTopic);
            producer = this.connection.createProducer(replySession, replyTo, replyToTopic);
            producer.publish(message, messageBuilder);
        }
        catch (Exception e) {
            try {
                LOGGER.error(String.format("An error occurred during reply to destination [%s] of type [%s]: %s", destinationName, replyToTopic ? "TOPIC" : "QUEUE", e.getMessage()), (Throwable)e);
                JmsListener.notifyIfConnectionProblem(callbackContext, e);
            }
            catch (Throwable throwable) {
                JmsCommons.closeQuietly(producer);
                JmsCommons.closeQuietly(replySession);
                JmsCommons.closeQuietly(session);
                throw throwable;
            }
            JmsCommons.closeQuietly(producer);
            JmsCommons.closeQuietly(replySession);
            JmsCommons.closeQuietly(session);
        }
        JmsCommons.closeQuietly(producer);
        JmsCommons.closeQuietly(replySession);
        JmsCommons.closeQuietly(session);
    }

    private JmsSession getSession(JmsTransactionalConnection connection, boolean isTopic) throws JMSException {
        Optional<JmsSession> transactedSession = this.sessionManager.getTransactedSession(connection);
        if (transactedSession.isPresent()) {
            return transactedSession.get();
        }
        return connection.createSession(InternalAckMode.AUTO, isTopic);
    }

    private boolean replyDestinationIsTopic(Destination destination) {
        if (destination instanceof Topic && destination instanceof Queue && this.jmsSupport instanceof Jms102bSupport) {
            LOGGER.error(StringMessageUtils.getBoilerPlate((String)"Destination implements both Queue and Topic while complying with JMS 1.0.2b specification. Please report your application server or JMS vendor name and version to http://www.mulesoft.org/jira"));
        }
        return destination instanceof Topic;
    }

    private void validateNumberOfConsumers(int numberOfConsumers) {
        TopicConsumer topicConsumer;
        if (numberOfConsumers < 1) {
            throw new IllegalArgumentException("Invalid number of consumers: [" + numberOfConsumers + "]. The number should be 1 or greater.");
        }
        if (numberOfConsumers > 1 && this.consumerType.topic() && !this.isCapableOfMultiConsumersOnTopic(topicConsumer = (TopicConsumer)this.consumerType)) {
            throw new IllegalArgumentException("Destination [" + this.destination + "] is a topic, but [" + numberOfConsumers + "] receivers have been requested. This is only possible for 'shared' topic consumers, otherwise use 1.");
        }
    }

    private boolean isCapableOfMultiConsumersOnTopic(TopicConsumer topicConsumer) {
        return this.jmsSupport.getSpecification().equals((Object)JmsSpecification.JMS_2_0) && topicConsumer.isShared();
    }
}

