/*
 * Decompiled with CFR 0.152.
 */
package org.mule.jms.commons.internal.source;

import java.util.Optional;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.mule.jms.commons.api.connection.JmsSpecification;
import org.mule.jms.commons.api.destination.ConsumerType;
import org.mule.jms.commons.api.destination.TopicConsumer;
import org.mule.jms.commons.internal.common.JmsCommons;
import org.mule.jms.commons.internal.config.InternalAckMode;
import org.mule.jms.commons.internal.config.JmsAckMode;
import org.mule.jms.commons.internal.config.JmsConfig;
import org.mule.jms.commons.internal.connection.JmsTransactionalConnection;
import org.mule.jms.commons.internal.connection.JmsXaTransactionalConnection;
import org.mule.jms.commons.internal.connection.session.JmsSession;
import org.mule.jms.commons.internal.connection.session.JmsSessionManager;
import org.mule.jms.commons.internal.publish.JmsMessageProducer;
import org.mule.jms.commons.internal.source.JmsResponseMessageBuilder;
import org.mule.jms.commons.internal.source.MessageConsumerDelegate;
import org.mule.jms.commons.internal.source.polling.JmsXaPollingMessageConsumerDelegate;
import org.mule.jms.commons.internal.source.push.JmsMessageListenerDelegate;
import org.mule.jms.commons.internal.source.push.JmsMessageListenerFactory;
import org.mule.jms.commons.internal.support.Jms102bSupport;
import org.mule.jms.commons.internal.support.JmsSupport;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.api.util.StringMessageUtils;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.tx.SourceTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsListener.class);
    static final String REPLY_TO_DESTINATION_VAR = "REPLY_TO_DESTINATION";
    private static final String THREAD_NAME = "JMS-CLIENT-LISTENER";
    private Scheduler scheduler;
    private JmsSessionManager sessionManager;
    private JmsConfig config;
    private ConnectionProvider<JmsTransactionalConnection> connectionProvider;
    private JmsTransactionalConnection connection;
    private JmsSupport jmsSupport;
    private String destination;
    private ConsumerType consumerType;
    private JmsAckMode ackMode;
    private String selector;
    private String inboundContentType;
    private String inboundEncoding;
    private int numberOfConsumers;
    private SourceTransactionalAction transactionalAction;
    private MessageConsumerDelegate messageConsumerDelegate;
    private SchedulerService schedulerService;

    static void notifyIfConnectionProblem(SourceCallbackContext callbackContext, Exception e) {
        JmsListener.notifyIfConnectionProblem(callbackContext.getSourceCallback(), e);
    }

    public static void notifyIfConnectionProblem(SourceCallback callback, Exception e) {
        ExceptionUtils.extractConnectionException((Throwable)e).ifPresent(ce -> callback.onConnectionException(ce));
    }

    public JmsListener(JmsSessionManager sessionManager, JmsConfig config, ConnectionProvider<JmsTransactionalConnection> connectionProvider, String destination, ConsumerType consumerType, JmsAckMode ackMode, String selector, String inboundContentType, String inboundEncoding, int numberOfConsumers, SourceTransactionalAction transactionalAction, SchedulerService schedulerService) {
        this.sessionManager = sessionManager;
        this.config = config;
        this.connectionProvider = connectionProvider;
        this.destination = destination;
        this.consumerType = consumerType;
        this.ackMode = ackMode;
        this.selector = selector;
        this.inboundContentType = inboundContentType;
        this.inboundEncoding = inboundEncoding;
        this.numberOfConsumers = numberOfConsumers;
        this.transactionalAction = transactionalAction;
        this.schedulerService = schedulerService;
    }

    public void onStart(SourceCallback sourceCallback) throws MuleException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting JMS Message Listener");
        }
        this.scheduler = this.schedulerService.ioScheduler(SchedulerConfig.config().withName(THREAD_NAME));
        Object consumerConfig = this.config.getConsumerConfig();
        InternalAckMode resolvedAckMode = this.transactionalAction.equals((Object)SourceTransactionalAction.ALWAYS_BEGIN) ? InternalAckMode.TRANSACTED : JmsCommons.resolveOverride(JmsCommons.toInternalAckMode(consumerConfig.getAckMode()), JmsCommons.toInternalAckMode(this.ackMode));
        this.connection = (JmsTransactionalConnection)this.connectionProvider.connect();
        this.jmsSupport = this.connection.getJmsSupport();
        this.connection.registerExceptionListener(e -> sourceCallback.onConnectionException(new ConnectionException((Throwable)e, (Object)this.connection)));
        this.validateNumberOfConsumers(this.numberOfConsumers);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Starting JMS Listener with [%s] consumers on destination [%s] of type [%s] with AckMode [%s]", this.numberOfConsumers, this.destination, JmsCommons.getDestinationType(this.consumerType), resolvedAckMode.name()));
        }
        this.messageConsumerDelegate = this.connection instanceof JmsXaTransactionalConnection ? new JmsXaPollingMessageConsumerDelegate((JmsXaTransactionalConnection)this.connection, this.jmsSupport, this.destination, this.consumerType, this.config, this.selector, this.sessionManager, this.connectionProvider, this.scheduler, this.inboundContentType, this.inboundEncoding, sourceCallback) : new JmsMessageListenerDelegate(new JmsMessageListenerFactory(resolvedAckMode, this.inboundEncoding, this.inboundContentType, this.config, this.sessionManager, this.jmsSupport, sourceCallback, this.connectionProvider), this.connection, this.jmsSupport, this.consumerType, this.destination, this.config, resolvedAckMode, this.selector);
        this.messageConsumerDelegate.createConsumers(this.numberOfConsumers);
    }

    public void onStop() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Stopping JMS Listener on destination [%s:%s]", JmsCommons.getDestinationType(this.consumerType), this.destination));
        }
        try {
            if (this.connection != null) {
                this.connectionProvider.disconnect((Object)this.connection);
                this.messageConsumerDelegate.stop();
            }
        }
        finally {
            if (this.scheduler != null) {
                this.scheduler.stop();
            }
        }
    }

    public void onSuccess(JmsResponseMessageBuilder messageBuilder, CorrelationInfo correlationInfo, SourceCallbackContext callbackContext) {
        this.messageConsumerDelegate.onSuccess(callbackContext);
        callbackContext.getVariable(REPLY_TO_DESTINATION_VAR).ifPresent(replyTo -> this.doReply(messageBuilder, callbackContext, (Destination)replyTo, correlationInfo));
    }

    public void onError(Error error, SourceCallbackContext callbackContext) {
        this.messageConsumerDelegate.onError(callbackContext, error);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doReply(JmsResponseMessageBuilder messageBuilder, SourceCallbackContext callbackContext, Destination replyTo, CorrelationInfo correlationInfo) {
        block9: {
            String destinationName;
            boolean replyToTopic = this.replyDestinationIsTopic(replyTo);
            try {
                destinationName = replyToTopic ? ((Topic)replyTo).getTopicName() : ((Queue)replyTo).getQueueName();
            }
            catch (JMSException e) {
                LOGGER.error(String.format("An error occurred during reply. Failed to obtain the destination name: %s", e.getMessage()));
                JmsListener.notifyIfConnectionProblem(callbackContext, (Exception)((Object)e));
                return;
            }
            JmsMessageProducer producer = null;
            JmsSession session = null;
            try {
                session = this.getSession(this.connection, replyToTopic);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Begin reply to destination [%s] of type [%s]", destinationName, replyToTopic ? "TOPIC" : "QUEUE"));
                }
                Message message = messageBuilder.build(this.connection.getJmsSupport(), OutboundCorrelationStrategy.AUTO, correlationInfo, session.get(), this.config);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Message built, sending message to " + destinationName);
                }
                JmsSession replySession = this.connection.createSession(InternalAckMode.AUTO, replyToTopic);
                producer = this.connection.createProducer(replySession, replyTo, replyToTopic);
                producer.publish(message, messageBuilder);
                JmsCommons.closeQuietly(producer);
            }
            catch (Exception e) {
                LOGGER.error(String.format("An error occurred during reply to destination [%s] of type [%s]: %s", destinationName, replyToTopic ? "TOPIC" : "QUEUE", e.getMessage()), (Throwable)e);
                JmsListener.notifyIfConnectionProblem(callbackContext, e);
                break block9;
            }
            finally {
                JmsCommons.closeQuietly(producer);
                JmsCommons.closeQuietly(session);
            }
            JmsCommons.closeQuietly(session);
        }
    }

    private JmsSession getSession(JmsTransactionalConnection connection, boolean isTopic) throws JMSException {
        Optional<JmsSession> transactedSession = this.sessionManager.getTransactedSession();
        if (transactedSession.isPresent()) {
            return transactedSession.get();
        }
        return connection.createSession(InternalAckMode.AUTO, isTopic);
    }

    private boolean replyDestinationIsTopic(Destination destination) {
        if (destination instanceof Topic && destination instanceof Queue && this.jmsSupport instanceof Jms102bSupport) {
            LOGGER.error(StringMessageUtils.getBoilerPlate((String)"Destination implements both Queue and Topic while complying with JMS 1.0.2b specification. Please report your application server or JMS vendor name and version to http://www.mulesoft.org/jira"));
        }
        return destination instanceof Topic;
    }

    private void validateNumberOfConsumers(int numberOfConsumers) {
        TopicConsumer topicConsumer;
        if (numberOfConsumers < 1) {
            throw new IllegalArgumentException("Invalid number of consumers: [" + numberOfConsumers + "]. The number should be 1 or greater.");
        }
        if (numberOfConsumers > 1 && this.consumerType.topic() && !this.isCapableOfMultiConsumersOnTopic(topicConsumer = (TopicConsumer)this.consumerType)) {
            throw new IllegalArgumentException("Destination [" + this.destination + "] is a topic, but [" + numberOfConsumers + "] receivers have been requested. This is only possible for 'shared' topic consumers, otherwise use 1.");
        }
    }

    private boolean isCapableOfMultiConsumersOnTopic(TopicConsumer topicConsumer) {
        return this.jmsSupport.getSpecification().equals((Object)JmsSpecification.JMS_2_0) && topicConsumer.isShared();
    }
}

