/*
 * Decompiled with CFR 0.152.
 */
package org.mule.jms.commons.internal.source;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.mule.jms.commons.api.RequestReplyPattern;
import org.mule.jms.commons.api.connection.DefaultReconnectionManagerProvider;
import org.mule.jms.commons.api.connection.JmsReconnectionManager;
import org.mule.jms.commons.api.connection.JmsSpecification;
import org.mule.jms.commons.api.destination.ConsumerType;
import org.mule.jms.commons.api.destination.TopicConsumer;
import org.mule.jms.commons.api.lock.JmsListenerLockFactory;
import org.mule.jms.commons.internal.common.JmsCommons;
import org.mule.jms.commons.internal.config.InternalAckMode;
import org.mule.jms.commons.internal.config.JmsAckMode;
import org.mule.jms.commons.internal.config.JmsConfig;
import org.mule.jms.commons.internal.connection.JmsConnection;
import org.mule.jms.commons.internal.connection.JmsTransactionalConnection;
import org.mule.jms.commons.internal.connection.XaJmsTransactionalConnection;
import org.mule.jms.commons.internal.connection.session.JmsSession;
import org.mule.jms.commons.internal.connection.session.JmsSessionManager;
import org.mule.jms.commons.internal.publish.JmsMessageProducer;
import org.mule.jms.commons.internal.source.DefaultJmsConnectionExceptionResolver;
import org.mule.jms.commons.internal.source.DefaultJmsResourceReleaser;
import org.mule.jms.commons.internal.source.JmsConnectionExceptionResolver;
import org.mule.jms.commons.internal.source.JmsResourceReleaser;
import org.mule.jms.commons.internal.source.JmsResponseMessageBuilder;
import org.mule.jms.commons.internal.source.MessageConsumerDelegate;
import org.mule.jms.commons.internal.source.SourceConfiguration;
import org.mule.jms.commons.internal.source.polling.JmsXaPollingMessageConsumerDelegate;
import org.mule.jms.commons.internal.source.push.JmsMessageListenerDelegate;
import org.mule.jms.commons.internal.source.push.JmsMessageListenerFactory;
import org.mule.jms.commons.internal.support.Jms102bSupport;
import org.mule.jms.commons.internal.support.JmsSupport;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.connection.ConnectionValidationResult;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.core.api.util.StringMessageUtils;
import org.mule.runtime.extension.api.connectivity.XATransactionalConnection;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.tx.SourceTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsListener.class);
    static final String REPLY_TO_DESTINATION_VAR = "REPLY_TO_DESTINATION";
    static final String CORRELATION_ID_VAR = "CORRELATION_ID";
    static final String MESSAGE_ID_VAR = "MESSAGE_ID";
    static final boolean DEFAULT_IGNORE_REPLY_TO = false;
    static final String JMS_ACK_ID = "JMS_ACK_ID";
    private static final String THREAD_NAME = "JMS-CLIENT-LISTENER";
    private static final String TRACE_PENDING_ACK_CRON_EXPRESSION = "0/30 0/1 * 1/1 * ? *";
    private JmsConnectionExceptionResolver exceptionResolver;
    private Scheduler scheduler;
    private JmsSessionManager sessionManager;
    private JmsConfig config;
    private ConnectionProvider<JmsTransactionalConnection> connectionProvider;
    private JmsTransactionalConnection connection;
    private JmsSupport jmsSupport;
    private String destination;
    private ConsumerType consumerType;
    private JmsAckMode ackMode;
    private String selector;
    private String inboundContentType;
    private String inboundEncoding;
    private int numberOfConsumers;
    private MessageConsumerDelegate messageConsumerDelegate;
    private SourceConfiguration sourceConfiguration;
    private SchedulerService schedulerService;
    private JmsResourceReleaser resourceCleaner;
    private final JmsListenerLockFactory lockFactory;
    private JmsReconnectionManager reconnectionManager;
    private AtomicBoolean exceptionListenerRegistered = new AtomicBoolean(false);
    private SourceCallback sourceCallback;
    private Scheduler traceScheduler;
    private ExceptionListener exceptionListener;

    static void notifyIfConnectionProblem(SourceCallbackContext callbackContext, Exception e, JmsConnectionExceptionResolver exceptionResolver) {
        JmsListener.notifyIfConnectionProblem(callbackContext.getSourceCallback(), e, exceptionResolver);
    }

    public static void notifyIfConnectionProblem(SourceCallback callback, Exception e, JmsConnectionExceptionResolver exceptionResolver) {
        exceptionResolver.resolveException(e).ifPresent(ce -> callback.onConnectionException(ce));
    }

    public JmsListener(JmsSessionManager sessionManager, JmsConfig config, ConnectionProvider<JmsTransactionalConnection> connectionProvider, String destination, ConsumerType consumerType, JmsAckMode ackMode, String selector, String inboundContentType, String inboundEncoding, int numberOfConsumers, SourceConfiguration sourceConfiguration, SchedulerService schedulerService, JmsConnectionExceptionResolver exceptionResolver, JmsResourceReleaser resourceCleaner) {
        this(sessionManager, config, connectionProvider, destination, consumerType, ackMode, selector, inboundContentType, inboundEncoding, numberOfConsumers, sourceConfiguration, schedulerService, exceptionResolver, resourceCleaner, JmsListenerLockFactory.newDefault());
    }

    public JmsListener(JmsSessionManager sessionManager, JmsConfig config, ConnectionProvider<JmsTransactionalConnection> connectionProvider, String destination, ConsumerType consumerType, JmsAckMode ackMode, String selector, String inboundContentType, String inboundEncoding, int numberOfConsumers, SourceConfiguration sourceConfiguration, SchedulerService schedulerService, JmsConnectionExceptionResolver exceptionResolver, JmsResourceReleaser resourceCleaner, JmsListenerLockFactory lockFactory) {
        this(sessionManager, config, connectionProvider, destination, consumerType, ackMode, selector, inboundContentType, inboundEncoding, numberOfConsumers, sourceConfiguration, schedulerService, exceptionResolver, resourceCleaner, lockFactory, new DefaultReconnectionManagerProvider());
    }

    public JmsListener(JmsSessionManager sessionManager, JmsConfig config, ConnectionProvider<JmsTransactionalConnection> connectionProvider, String destination, ConsumerType consumerType, JmsAckMode ackMode, String selector, String inboundContentType, String inboundEncoding, int numberOfConsumers, SourceConfiguration sourceConfiguration, SchedulerService schedulerService) {
        this(sessionManager, config, connectionProvider, destination, consumerType, ackMode, selector, inboundContentType, inboundEncoding, numberOfConsumers, sourceConfiguration, schedulerService, new DefaultJmsConnectionExceptionResolver(), new DefaultJmsResourceReleaser());
    }

    public JmsListener(JmsSessionManager sessionManager, JmsConfig config, ConnectionProvider<JmsTransactionalConnection> connectionProvider, String destination, ConsumerType consumerType, JmsAckMode ackMode, String selector, String inboundContentType, String inboundEncoding, int numberOfConsumers, SourceConfiguration sourceConfiguration, SchedulerService schedulerService, JmsConnectionExceptionResolver exceptionResolver, JmsResourceReleaser resourceCleaner, JmsListenerLockFactory lockFactory, JmsReconnectionManager reconnectionManager) {
        this.sessionManager = sessionManager;
        this.config = config;
        this.connectionProvider = connectionProvider;
        this.destination = destination;
        this.consumerType = consumerType;
        this.ackMode = ackMode;
        this.selector = selector;
        this.inboundContentType = inboundContentType;
        this.inboundEncoding = inboundEncoding;
        this.numberOfConsumers = numberOfConsumers;
        this.sourceConfiguration = sourceConfiguration;
        this.schedulerService = schedulerService;
        this.exceptionResolver = exceptionResolver;
        this.resourceCleaner = resourceCleaner;
        this.lockFactory = lockFactory;
        this.reconnectionManager = reconnectionManager;
        if (LOGGER.isTraceEnabled() && sessionManager != null) {
            LOGGER.trace("Scheduling dump of pending Acknowledgements");
            this.traceScheduler = schedulerService.ioScheduler(SchedulerConfig.config().withName("JMS-CLIENT-LISTENER_TRACING"));
            this.traceScheduler.scheduleWithCronExpression(() -> {
                Set<String> pendingAcks = sessionManager.getPendingAcknowledgements();
                LOGGER.trace("Pending Acknowledgements for Flow [{}]: [{}]", (Object)sourceConfiguration.getFlowName(), (Object)pendingAcks.size());
                LOGGER.trace("Pending AckIds: [{}]", (Object)pendingAcks.stream().collect(Collectors.joining(",")));
            }, TRACE_PENDING_ACK_CRON_EXPRESSION);
        }
    }

    public void onStart(SourceCallback sourceCallback) throws MuleException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting JMS Message Listener");
        }
        this.sourceCallback = sourceCallback;
        this.scheduler = this.schedulerService.ioScheduler(SchedulerConfig.config().withName(THREAD_NAME));
        Object consumerConfig = this.config.getConsumerConfig();
        InternalAckMode resolvedAckMode = this.sourceConfiguration.getTransactionalAction().equals((Object)SourceTransactionalAction.ALWAYS_BEGIN) ? InternalAckMode.TRANSACTED : JmsCommons.resolveOverride(JmsCommons.toInternalAckMode(consumerConfig.getAckMode()), JmsCommons.toInternalAckMode(this.ackMode));
        this.connection = (JmsTransactionalConnection)this.connectionProvider.connect();
        ConnectionValidationResult connectionValidationResult = this.connectionProvider.validate((Object)this.connection);
        if (!connectionValidationResult.isValid()) {
            throw new ConnectionException((Throwable)connectionValidationResult.getException(), (Object)this.connection);
        }
        this.validateTransactionType(this.connection);
        this.jmsSupport = this.connection.getJmsSupport();
        if (!this.exceptionListenerRegistered.getAndSet(true)) {
            this.exceptionListener = e -> sourceCallback.onConnectionException(new ConnectionException((Throwable)e, (Object)this.connection));
            this.connection.registerExceptionListener(this.exceptionListener);
        }
        this.validateNumberOfConsumers(this.numberOfConsumers);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Starting JMS Listener with [%s] consumers on destination [%s] of type [%s] with AckMode [%s]", this.numberOfConsumers, this.destination, JmsCommons.getDestinationType(this.consumerType), resolvedAckMode.name()));
        }
        this.messageConsumerDelegate = this.isXa(this.connection) && this.sourceConfiguration.getTransactionalAction() == SourceTransactionalAction.ALWAYS_BEGIN ? new JmsXaPollingMessageConsumerDelegate(this.connection, this.jmsSupport, this.destination, this.consumerType, this.config, this.selector, this.sessionManager, this.connectionProvider, this.scheduler, this.inboundContentType, this.inboundEncoding, sourceCallback, this.exceptionResolver) : new JmsMessageListenerDelegate(new JmsMessageListenerFactory(resolvedAckMode, this.inboundEncoding, this.inboundContentType, this.config, this.sessionManager, this.jmsSupport, sourceCallback, this.connectionProvider, this.exceptionResolver), this.connection, this.jmsSupport, this.consumerType, this.destination, this.config, resolvedAckMode, this.selector, this.lockFactory, this.resourceCleaner);
        this.messageConsumerDelegate.createConsumers(this.numberOfConsumers);
    }

    private boolean isXa(JmsConnection connection) {
        return connection instanceof XaJmsTransactionalConnection;
    }

    private void validateTransactionType(JmsTransactionalConnection connection) throws ConnectionException {
        switch (this.sourceConfiguration.getTransactionalAction()) {
            case ALWAYS_BEGIN: {
                switch (this.sourceConfiguration.getTransactionType()) {
                    case XA: {
                        if (!(connection instanceof XATransactionalConnection)) {
                            throw new ConnectionException(String.format("Invalid configuration, The message listener on the flow '%s' has been configured to work with XA Transactions, but the given connection from the config '%s' doesn't support it.\nThis can be fixed doing one of the following:\n - To work with Local transactions, select the 'LOCAL' Transaction Type on the Advanced Source Configuration \n - To work with XA Transactions, enable XA in the connection configuration", this.sourceConfiguration.getFlowName(), this.sourceConfiguration.getConfigName()));
                        }
                        return;
                    }
                    case LOCAL: {
                        if (!(connection instanceof XATransactionalConnection)) break;
                        throw new ConnectionException(String.format("Invalid configuration: The message listener on the flow '%s' has been configured to work with Local Transactions, but the given connection from the config '%s' requires XA Transactions. \nThis can be fixed doing one of the following:\n - To work with XA Transactions, select the 'XA' Transaction Type on the Advanced Source Configuration\n - To work with Local transactions, disable XA in the connection configuration", this.sourceConfiguration.getFlowName(), this.sourceConfiguration.getConfigName()));
                    }
                }
                break;
            }
            case NONE: {
                if (!(connection instanceof XATransactionalConnection)) break;
                LOGGER.info("A XA Connection is being used in a non transactional context, this could led to unexpected behaviour");
            }
        }
    }

    public void onStop() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Stopping JMS Listener on destination [%s:%s]", JmsCommons.getDestinationType(this.consumerType), this.destination));
        }
        try {
            if (this.messageConsumerDelegate != null) {
                this.messageConsumerDelegate.stop();
                this.messageConsumerDelegate = null;
            }
            if (this.connection != null && this.exceptionListenerRegistered.getAndSet(false)) {
                this.connection.removeExceptionListener(this.exceptionListener);
                this.exceptionListener = null;
            }
            if (this.connection != null && !(this.connection instanceof XaJmsTransactionalConnection)) {
                this.resourceCleaner.releaseConnection(this.connection.get());
                this.connectionProvider.disconnect((Object)this.connection);
            }
        }
        finally {
            if (this.scheduler != null) {
                this.scheduler.stop();
            }
        }
    }

    public void disableConsumers() {
        if (this.messageConsumerDelegate != null) {
            this.messageConsumerDelegate.disableConsumers();
        }
    }

    public void restart() throws MuleException {
        this.reconnectionManager.blockOperations();
        this.onStop();
        this.startUsingCurrentSourceCallback();
        this.reconnectionManager.unblockOperations();
    }

    public void startUsingCurrentSourceCallback() throws MuleException {
        this.onStart(this.sourceCallback);
    }

    public void onSuccess(JmsResponseMessageBuilder messageBuilder, CorrelationInfo correlationInfo, SourceCallbackContext callbackContext) {
        if (this.ackMode != null && this.ackMode.getInternalAckMode() != null && this.ackMode.getInternalAckMode().equals((Object)InternalAckMode.MANUAL)) {
            callbackContext.getVariable(JMS_ACK_ID).ifPresent(ackId -> {
                if (this.sessionManager.isPendingAck((String)ackId)) {
                    LOGGER.warn("The flow is using Manual Acknowledgment. The execution ended without message acknowledgment or recover, this might lead to unpredictable behaviors");
                }
            });
        }
        if (this.messageConsumerDelegate != null) {
            this.messageConsumerDelegate.onSuccess(callbackContext);
        }
        if (!messageBuilder.isReplyToIgnored()) {
            callbackContext.getVariable(REPLY_TO_DESTINATION_VAR).ifPresent(replyTo -> this.doReply(messageBuilder, callbackContext, (Destination)replyTo, correlationInfo, new RequestReplyContext(messageBuilder.getRequestReplyPattern(), callbackContext.getVariable(MESSAGE_ID_VAR).orElse(null), callbackContext.getVariable(CORRELATION_ID_VAR).orElse(null))));
        }
    }

    public void onError(Error error, SourceCallbackContext callbackContext) {
        if (this.ackMode != null && this.ackMode.getInternalAckMode() != null && this.ackMode.getInternalAckMode().equals((Object)InternalAckMode.MANUAL)) {
            callbackContext.getVariable(JMS_ACK_ID).ifPresent(ackId -> {
                if (this.sessionManager.isPendingAck((String)ackId)) {
                    LOGGER.warn("The flow is using Manual Acknowledgment. The execution ended without message acknowledgment or recover, this might lead to unpredictable behaviors");
                }
            });
        }
        if (this.messageConsumerDelegate != null) {
            this.messageConsumerDelegate.onError(callbackContext, error);
        } else {
            LOGGER.debug("A error occurred after the Source being stopped. {}", (Object)error);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doReply(JmsResponseMessageBuilder messageBuilder, SourceCallbackContext callbackContext, Destination replyTo, CorrelationInfo correlationInfo, RequestReplyContext requestReplyPattern) {
        String destinationName;
        boolean replyToTopic = this.replyDestinationIsTopic(replyTo);
        try {
            destinationName = replyToTopic ? ((Topic)replyTo).getTopicName() : ((Queue)replyTo).getQueueName();
        }
        catch (JMSException e) {
            LOGGER.error(String.format("An error occurred during reply. Failed to obtain the destination name: %s", e.getMessage()));
            JmsListener.notifyIfConnectionProblem(callbackContext, (Exception)((Object)e), this.exceptionResolver);
            return;
        }
        JmsMessageProducer producer = null;
        JmsSession session = null;
        try {
            session = this.getSession(this.connection, replyToTopic);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Begin reply to destination [%s] of type [%s]", destinationName, replyToTopic ? "TOPIC" : "QUEUE"));
            }
            Message message = messageBuilder.build(this.connection.getJmsSupport(), messageBuilder.getSendCorrelationId(), correlationInfo, session.get(), this.config);
            this.applyRequestResponsePattern(messageBuilder, requestReplyPattern, message);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Message built, sending message to " + destinationName);
            }
            producer = this.connection.createProducer(session, replyTo, replyToTopic);
            producer.publish(message, messageBuilder);
        }
        catch (Exception e) {
            try {
                LOGGER.error(String.format("An error occurred during reply to destination [%s] of type [%s]: %s", destinationName, replyToTopic ? "TOPIC" : "QUEUE", e.getMessage()), (Throwable)e);
                this.rollbackIfInTransaction(callbackContext);
                JmsListener.notifyIfConnectionProblem(callbackContext, e, this.exceptionResolver);
            }
            catch (Throwable throwable) {
                JmsCommons.releaseResources(session, JmsCommons.isPartOfCurrentTx(session, this.connection, this.sessionManager), producer);
                throw throwable;
            }
            JmsCommons.releaseResources(session, JmsCommons.isPartOfCurrentTx(session, this.connection, this.sessionManager), producer);
        }
        JmsCommons.releaseResources(session, JmsCommons.isPartOfCurrentTx(session, this.connection, this.sessionManager), producer);
    }

    private void rollbackIfInTransaction(SourceCallbackContext callbackContext) {
        if (callbackContext.getTransactionHandle().isTransacted()) {
            try {
                callbackContext.getTransactionHandle().rollback();
            }
            catch (TransactionException e) {
                LOGGER.error("Unable to rollback transaction after the error occurred during reply.", (Throwable)e);
            }
        }
    }

    private void applyRequestResponsePattern(JmsResponseMessageBuilder messageBuilder, RequestReplyContext requestReplyContext, Message message) throws JMSException {
        if (messageBuilder.getCorrelationId() == null) {
            switch (requestReplyContext.getPattern()) {
                case CORRELATION_ID: {
                    message.setJMSCorrelationID(requestReplyContext.getCorrelationId());
                    break;
                }
                case MESSAGE_ID: {
                    message.setJMSCorrelationID(requestReplyContext.getMessageId());
                    break;
                }
            }
        }
    }

    private JmsSession getSession(JmsTransactionalConnection connection, boolean isTopic) throws JMSException {
        Optional<JmsSession> transactedSession = this.sessionManager.getTransactedSession(connection);
        if (transactedSession.isPresent()) {
            return transactedSession.get();
        }
        return connection.createSession(InternalAckMode.AUTO, isTopic);
    }

    private boolean replyDestinationIsTopic(Destination destination) {
        if (destination instanceof Topic && destination instanceof Queue && this.jmsSupport instanceof Jms102bSupport) {
            LOGGER.error(StringMessageUtils.getBoilerPlate((String)"Destination implements both Queue and Topic while complying with JMS 1.0.2b specification. Please report your application server or JMS vendor name and version to https://www.mulesoft.org/jira"));
        }
        return destination instanceof Topic;
    }

    private void validateNumberOfConsumers(int numberOfConsumers) {
        TopicConsumer topicConsumer;
        if (numberOfConsumers < 1) {
            throw new IllegalArgumentException("Invalid number of consumers: [" + numberOfConsumers + "]. The number should be 1 or greater.");
        }
        if (numberOfConsumers > 1 && this.consumerType.topic() && !this.isCapableOfMultiConsumersOnTopic(topicConsumer = (TopicConsumer)this.consumerType)) {
            throw new IllegalArgumentException("Destination [" + this.destination + "] is a topic, but [" + numberOfConsumers + "] receivers have been requested. This is only possible for 'shared' topic consumers, otherwise use 1.");
        }
    }

    private boolean isCapableOfMultiConsumersOnTopic(TopicConsumer topicConsumer) {
        return this.jmsSupport.getSpecification().equals((Object)JmsSpecification.JMS_2_0) && topicConsumer.isShared();
    }

    private class RequestReplyContext {
        RequestReplyPattern pattern;
        String messageId;
        String correlationId;

        public RequestReplyContext(RequestReplyPattern pattern, String messageId, String correlationId) {
            this.pattern = pattern;
            this.messageId = messageId;
            this.correlationId = correlationId;
        }

        public RequestReplyPattern getPattern() {
            return this.pattern;
        }

        public String getMessageId() {
            return this.messageId;
        }

        public String getCorrelationId() {
            return this.correlationId;
        }

        public String toString() {
            return "RequestReplyContext{pattern=" + (Object)((Object)this.pattern) + ", messageId='" + this.messageId + '\'' + ", correlationId='" + this.correlationId + '\'' + '}';
        }
    }

    public static class Builder {
        JmsSessionManager sessionManager;
        JmsConfig config;
        ConnectionProvider<JmsTransactionalConnection> connectionProvider;
        String destination;
        ConsumerType consumerType;
        JmsAckMode ackMode;
        String selector;
        String inboundContentType;
        String inboundEncoding;
        int numberOfConsumers;
        SourceConfiguration sourceConfiguration;
        SchedulerService schedulerService;
        Boolean ignoreReplyTo = false;
        Optional<JmsConnectionExceptionResolver> exceptionResolver = Optional.empty();
        Optional<JmsResourceReleaser> resourceCleaner = Optional.empty();
        Optional<JmsListenerLockFactory> lockFactory = Optional.empty();
        Optional<Integer> jmsMaxIdleConnectionTimeout = Optional.empty();
        Optional<JmsReconnectionManager> reconnectionManager = Optional.empty();

        public Builder(JmsSessionManager sessionManager, JmsConfig config, ConnectionProvider<JmsTransactionalConnection> connectionProvider, String destination, ConsumerType consumerType, JmsAckMode ackMode, String selector, String inboundContentType, String inboundEncoding, int numberOfConsumers, SourceConfiguration sourceConfiguration, SchedulerService schedulerService) {
            this.sessionManager = sessionManager;
            this.config = config;
            this.connectionProvider = connectionProvider;
            this.destination = destination;
            this.consumerType = consumerType;
            this.ackMode = ackMode;
            this.selector = selector;
            this.inboundContentType = inboundContentType;
            this.inboundEncoding = inboundEncoding;
            this.numberOfConsumers = numberOfConsumers;
            this.sourceConfiguration = sourceConfiguration;
            this.schedulerService = schedulerService;
        }

        public Builder setExceptionResolver(JmsConnectionExceptionResolver resolver) {
            this.exceptionResolver = Optional.of(resolver);
            return this;
        }

        public Builder setResourceReleaser(JmsResourceReleaser resourceCleaner) {
            this.resourceCleaner = Optional.of(resourceCleaner);
            return this;
        }

        public Builder setListenerLockFactory(JmsListenerLockFactory lockFactory) {
            this.lockFactory = Optional.of(lockFactory);
            return this;
        }

        public Builder setMaxIdleConnectionTimeout(Integer jmsMaxIdleConnectionTimeout) {
            this.jmsMaxIdleConnectionTimeout = Optional.of(jmsMaxIdleConnectionTimeout);
            return this;
        }

        public Builder setReconnectionManager(JmsReconnectionManager reconnectionManager) {
            this.reconnectionManager = Optional.of(reconnectionManager);
            return this;
        }

        public JmsListener build() {
            return new JmsListener(this.sessionManager, this.config, this.connectionProvider, this.destination, this.consumerType, this.ackMode, this.selector, this.inboundContentType, this.inboundEncoding, this.numberOfConsumers, this.sourceConfiguration, this.schedulerService, this.exceptionResolver.orElseGet(DefaultJmsConnectionExceptionResolver::new), this.resourceCleaner.orElseGet(DefaultJmsResourceReleaser::new), this.lockFactory.orElseGet(JmsListenerLockFactory::newDefault), this.reconnectionManager.orElseGet(DefaultReconnectionManagerProvider::new));
        }
    }
}

