/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.io.Serializable;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQQueueBrowser;
import org.apache.qpid.client.AMQTemporaryQueue;
import org.apache.qpid.client.AMQTemporaryTopic;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.client.Closeable;
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.client.QueueReceiverAdaptor;
import org.apache.qpid.client.QueueSenderAdapter;
import org.apache.qpid.client.TemporaryDestination;
import org.apache.qpid.client.TopicPublisherAdapter;
import org.apache.qpid.client.TopicSubscriberAdaptor;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.message.JMSMapMessage;
import org.apache.qpid.client.message.JMSObjectMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer>
extends Closeable
implements Session,
QueueSession,
TopicSession {
    private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
    public static final String STRICT_AMQP = "STRICT_AMQP";
    public static final String STRICT_AMQP_DEFAULT = "false";
    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
    public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
    public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
    private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period", 5000L);
    private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", 60000L);
    private final boolean _delareQueues = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
    private final boolean _declareExchanges = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
    private final boolean _useAMQPEncodedMapMessage;
    protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean("qpid.jms.daemon.dispatcher");
    private AMQConnection _connection;
    private final boolean _transacted;
    private final int _acknowledgeMode;
    private int _channelId;
    private int _ticket;
    private int _prefetchHighMark;
    private int _prefetchLowMark;
    private MessageListener _messageListener = null;
    private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
    private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions = new ConcurrentHashMap();
    private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap();
    private final Lock _subscriberDetails = new ReentrantLock(true);
    private final Lock _subscriberAccess = new ReentrantLock(true);
    private final FlowControllingBlockingQueue _queue;
    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1L);
    private final AtomicLong _rollbackMark = new AtomicLong(-1L);
    private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue();
    private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue();
    private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue();
    private volatile Dispatcher _dispatcher;
    private volatile Thread _dispatcherThread;
    private MessageFactoryRegistry _messageFactoryRegistry;
    private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
    private int _nextTag = 1;
    private final IdToConsumerMap<C> _consumers = new IdToConsumerMap();
    private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList();
    private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = new ConcurrentHashMap();
    private long _nextProducerId;
    private volatile boolean _sessionInRecovery;
    private volatile boolean _usingDispatcherForCleanup;
    private boolean _connectionStopped;
    private boolean _hasMessageListeners;
    private boolean _suspended;
    private final Object _suspensionLock = new Object();
    private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
    private final boolean _immediatePrefetch;
    private final boolean _strictAMQP;
    private final boolean _strictAMQPFATAL;
    private final Object _messageDeliveryLock = new Object();
    private boolean _dirty;
    private boolean _failedOverDirty;
    private FlowControlIndicator _flowControl = new FlowControlIndicator();
    private static final Logger _dispatcherLogger = LoggerFactory.getLogger((String)"org.apache.qpid.client.AMQSession.Dispatcher");

    protected AtomicLong getHighestDeliveryTag() {
        return this._highestDeliveryTag;
    }

    protected ConcurrentLinkedQueue<Long> getPrefetchedMessageTags() {
        return this._prefetchedMessageTags;
    }

    protected ConcurrentLinkedQueue<Long> getUnacknowledgedMessageTags() {
        return this._unacknowledgedMessageTags;
    }

    protected ConcurrentLinkedQueue<Long> getDeliveredMessageTags() {
        return this._deliveredMessageTags;
    }

    protected Dispatcher getDispatcher() {
        return this._dispatcher;
    }

    protected Thread getDispatcherThread() {
        return this._dispatcherThread;
    }

    protected MessageFactoryRegistry getMessageFactoryRegistry() {
        return this._messageFactoryRegistry;
    }

    protected IdToConsumerMap<C> getConsumers() {
        return this._consumers;
    }

    protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) {
        this._usingDispatcherForCleanup = usingDispatcherForCleanup;
    }

    protected boolean isImmediatePrefetch() {
        return this._immediatePrefetch;
    }

    protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) {
        this._useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat();
        this._strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, "false"));
        this._strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
        this._immediatePrefetch = this._strictAMQP || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, "false"));
        this._connection = con;
        this._transacted = transacted;
        this._acknowledgeMode = transacted ? 0 : acknowledgeMode;
        this._channelId = channelId;
        this._messageFactoryRegistry = messageFactoryRegistry;
        this._prefetchHighMark = defaultPrefetchHighMark;
        this._prefetchLowMark = defaultPrefetchLowMark;
        this._queue = this._acknowledgeMode == 257 ? new FlowControllingBlockingQueue(this._prefetchHighMark, this._prefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener(){
            private final AtomicBoolean _suspendState = new AtomicBoolean();

            public void aboveThreshold(int currentValue) {
                if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing() || this._suspendState.getAndSet(true))) {
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Above threshold(" + AMQSession.this._prefetchHighMark + ") so suspending channel. Current value is " + currentValue);
                    }
                    try {
                        Threading.getThreadFactory().createThread((Runnable)new SuspenderRunner(this._suspendState)).start();
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Failed to create thread", e);
                    }
                }
            }

            public void underThreshold(int currentValue) {
                if (!AMQSession.this.isClosed() && !AMQSession.this.isClosing() && this._suspendState.getAndSet(false)) {
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Below threshold(" + AMQSession.this._prefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
                    }
                    try {
                        Threading.getThreadFactory().createThread((Runnable)new SuspenderRunner(this._suspendState)).start();
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Failed to create thread", e);
                    }
                }
            }
        }) : new FlowControllingBlockingQueue(this._prefetchHighMark, null);
        if (_logger.isDebugEnabled()) {
            _logger.debug("Created session:" + this);
        }
    }

    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) {
        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
    }

    @Override
    public void close() throws JMSException {
        this.close(-1L);
    }

    public abstract AMQException getLastException();

    @Override
    public void checkNotClosed() throws JMSException {
        try {
            super.checkNotClosed();
        }
        catch (javax.jms.IllegalStateException ise) {
            AMQException ex = this.getLastException();
            if (ex != null) {
                javax.jms.IllegalStateException ssnClosed = new javax.jms.IllegalStateException("Session has been closed", ex.getErrorCode().toString());
                ssnClosed.setLinkedException((Exception)((Object)ex));
                ssnClosed.initCause((Throwable)ex);
                throw ssnClosed;
            }
            throw ise;
        }
    }

    public BytesMessage createBytesMessage() throws JMSException {
        this.checkNotClosed();
        JMSBytesMessage msg = new JMSBytesMessage(this.getMessageDelegateFactory());
        msg.setAMQSession(this);
        return msg;
    }

    public void acknowledge() throws javax.jms.IllegalStateException, JMSException {
        if (this.isClosed()) {
            throw new javax.jms.IllegalStateException("Session is already closed");
        }
        if (this.hasFailedOverDirty()) {
            this.recover();
            throw new javax.jms.IllegalStateException("has failed over");
        }
        try {
            this.acknowledgeImpl();
            this.markClean();
        }
        catch (TransportException e) {
            throw this.toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
        }
    }

    protected abstract void acknowledgeImpl() throws JMSException;

    public abstract void acknowledgeMessage(long var1, boolean var3);

    public MethodRegistry getMethodRegistry() {
        MethodRegistry methodRegistry = this.getProtocolHandler().getMethodRegistry();
        return methodRegistry;
    }

    public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination destination) throws AMQException {
        this.bindQueue(queueName, routingKey, arguments, exchangeName, destination, false);
    }

    public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination, final boolean nowait) throws AMQException {
        new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>(){

            @Override
            public Object execute() throws AMQException, FailoverException {
                AMQSession.this.sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait);
                return null;
            }
        }, this._connection).execute();
    }

    public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException {
        if (((BasicMessageConsumer)consumer).getQueuename() != null) {
            this.bindQueue(((BasicMessageConsumer)consumer).getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd);
        }
    }

    public abstract void sendQueueBind(AMQShortString var1, AMQShortString var2, FieldTable var3, AMQShortString var4, AMQDestination var5, boolean var6) throws AMQException, FailoverException;

    public void close(long timeout) throws JMSException {
        this.close(timeout, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(long timeout, boolean sendClose) throws JMSException {
        if (_logger.isDebugEnabled()) {
            _logger.debug("Closing session: " + this);
        }
        if (!this.setClosed()) {
            this.setClosing(true);
            Object object = this.getFailoverMutex();
            synchronized (object) {
                Object object2 = this._messageDeliveryLock;
                synchronized (object2) {
                    this.closeProducersAndConsumers(null);
                    try {
                        if ((!this._connection.isClosed() || this._connection.isClosing()) && sendClose) {
                            this.sendClose(timeout);
                        }
                    }
                    catch (AMQException e) {
                        JMSException jmse = new JMSException("Error closing session: " + (Object)((Object)e));
                        jmse.setLinkedException((Exception)((Object)e));
                        jmse.initCause((Throwable)e);
                        throw jmse;
                    }
                    catch (FailoverException e) {
                        _logger.debug("Got FailoverException during channel close, ignored as channel already marked as closed.");
                    }
                    catch (TransportException e) {
                        throw this.toJMSException("Error closing session:" + e.getMessage(), e);
                    }
                    finally {
                        this._connection.deregisterSession(this._channelId);
                    }
                }
            }
        }
    }

    public abstract void sendClose(long var1) throws AMQException, FailoverException;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closed(Throwable e) throws JMSException {
        if (e instanceof AMQDisconnectedException) {
            this.stopDispatcherThread();
        }
        this.setClosing(e == null);
        if (!this.setClosed()) {
            Object object = this._messageDeliveryLock;
            synchronized (object) {
                AMQException amqe = e instanceof AMQException ? (AMQException)e : new AMQException("Closing session forcibly", e);
                this._connection.deregisterSession(this._channelId);
                this.closeProducersAndConsumers(amqe);
            }
        }
    }

    protected void stopDispatcherThread() {
        if (this._dispatcherThread != null) {
            this._dispatcherThread.interrupt();
        }
    }

    public void commit() throws JMSException {
        this.checkTransacted();
        if (this._failedOverDirty) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Session " + this._channelId + " was dirty whilst failing over. Rolling back.");
            }
            this.rollback();
            throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity.The session transaction was rolled back.");
        }
        try {
            this.commitImpl();
            this.markClean();
        }
        catch (AMQException e) {
            throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), (Exception)((Object)e));
        }
        catch (FailoverException e) {
            throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
        }
        catch (TransportException e) {
            throw this.toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
        }
    }

    protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;

    public void confirmConsumerCancelled(int consumerTag) {
        C consumer = this._consumers.get(consumerTag);
        if (consumer != null) {
            if (!((BasicMessageConsumer)consumer).isBrowseOnly()) {
                if (this._dispatcher != null) {
                    _logger.debug("Dispatcher is not null");
                } else {
                    _logger.debug("Dispatcher is null so created stopped dispatcher");
                    this.startDispatcherIfNecessary(true);
                }
                this._dispatcher.rejectPending(consumer);
            } else if (((BasicMessageConsumer)consumer).isAutoClose()) {
                if (((Closeable)consumer).isClosed()) {
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Closing consumer:" + ((BasicMessageConsumer)consumer).debugIdentity());
                    }
                    this.deregisterConsumer(consumer);
                } else {
                    this._queue.add(new CloseConsumerMessage((BasicMessageConsumer)consumer));
                }
            }
        }
    }

    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        if (this.isStrictAMQP()) {
            throw new UnsupportedOperationException();
        }
        return this.createBrowser(queue, null);
    }

    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
        if (this.isStrictAMQP()) {
            throw new UnsupportedOperationException();
        }
        this.checkNotClosed();
        this.checkValidQueue(queue);
        return new AMQQueueBrowser(this, queue, messageSelector);
    }

    protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
        this.checkValidDestination(destination);
        return this.createConsumerImpl(destination, this._prefetchHighMark, this._prefetchLowMark, noLocal, false, messageSelector, null, true, true);
    }

    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        this.checkValidDestination(destination);
        return this.createConsumerImpl(destination, this._prefetchHighMark, this._prefetchLowMark, false, destination instanceof Topic, null, null, this.isBrowseOnlyDestination(destination), false);
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        this.checkValidDestination(destination);
        return this.createConsumerImpl(destination, this._prefetchHighMark, this._prefetchLowMark, false, destination instanceof Topic, messageSelector, null, this.isBrowseOnlyDestination(destination), false);
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
        this.checkValidDestination(destination);
        return this.createConsumerImpl(destination, this._prefetchHighMark, this._prefetchLowMark, noLocal, destination instanceof Topic, messageSelector, null, this.isBrowseOnlyDestination(destination), false);
    }

    @Override
    public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, String selector) throws JMSException {
        this.checkValidDestination(destination);
        return this.createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, this.isBrowseOnlyDestination(destination), false);
    }

    @Override
    public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector) throws JMSException {
        this.checkValidDestination(destination);
        return this.createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, this.isBrowseOnlyDestination(destination), false);
    }

    public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable rawSelector) throws JMSException {
        this.checkValidDestination(destination);
        return this.createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, this.isBrowseOnlyDestination(destination), false);
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
        return this.createDurableSubscriber(topic, name, null, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
        this.checkNotClosed();
        Topic origTopic = this.checkValidTopic(topic, true);
        AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, this._connection);
        if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR && !dest.isAddressResolved()) {
            try {
                this.handleAddressBasedDestination(dest, false, noLocal, true);
                if (dest.getAddressType() != 2) {
                    throw new JMSException("Durable subscribers can only be created for Topics");
                }
                dest.getSourceNode().setDurable(true);
            }
            catch (AMQException e) {
                JMSException ex = new JMSException("Error when verifying destination");
                ex.initCause((Throwable)e);
                ex.setLinkedException((Exception)((Object)e));
                throw ex;
            }
            catch (TransportException e) {
                throw this.toJMSException("Error when verifying destination", e);
            }
        }
        String messageSelector = selector == null || selector.trim().length() == 0 ? null : selector;
        this._subscriberDetails.lock();
        try {
            TopicSubscriberAdaptor<Object> subscriber = this._subscriptions.get(name);
            if (subscriber == null) {
                AMQShortString topicName = dest.getRoutingKey();
                if (this._strictAMQP) {
                    if (this._strictAMQPFATAL) {
                        throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
                    }
                    _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' for creation durableSubscriber. Requesting queue deletion regardless.");
                    this.deleteQueue(dest.getAMQQueueName());
                } else {
                    HashMap<String, Object> args = new HashMap<String, Object>();
                    args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
                    if (noLocal) {
                        args.put(AMQPFilterTypes.NO_LOCAL.getValue().toString(), true);
                    }
                    boolean isQueueBound = this.isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
                    boolean isQueueBoundForTopicAndSelector = this.isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args);
                    if (isQueueBound && !isQueueBoundForTopicAndSelector) {
                        this.deleteQueue(dest.getAMQQueueName());
                    }
                }
            } else {
                if (subscriber.getTopic().equals(topic) && (messageSelector == null && subscriber.getMessageSelector() == null || messageSelector != null && messageSelector.equals(subscriber.getMessageSelector()))) {
                    throw new javax.jms.IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name + (messageSelector != null ? " and selector " + messageSelector : ""));
                }
                this.unsubscribe(name, true);
            }
            this._subscriberAccess.lock();
            try {
                BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer(dest, messageSelector, noLocal);
                subscriber = new TopicSubscriberAdaptor<BasicMessageConsumer>(dest, consumer);
                this._subscriptions.put(name, subscriber);
                this._reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
            }
            finally {
                this._subscriberAccess.unlock();
            }
            TopicSubscriberAdaptor<Object> topicSubscriberAdaptor = subscriber;
            return topicSubscriberAdaptor;
        }
        catch (TransportException e) {
            throw this.toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
        }
        finally {
            this._subscriberDetails.unlock();
        }
    }

    public MapMessage createMapMessage() throws JMSException {
        this.checkNotClosed();
        if (this._useAMQPEncodedMapMessage) {
            AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(this.getMessageDelegateFactory());
            msg.setAMQSession(this);
            return msg;
        }
        JMSMapMessage msg = new JMSMapMessage(this.getMessageDelegateFactory());
        msg.setAMQSession(this);
        return msg;
    }

    public Message createMessage() throws JMSException {
        return this.createBytesMessage();
    }

    public ObjectMessage createObjectMessage() throws JMSException {
        this.checkNotClosed();
        JMSObjectMessage msg = new JMSObjectMessage(this.getMessageDelegateFactory());
        msg.setAMQSession(this);
        return msg;
    }

    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
        ObjectMessage msg = this.createObjectMessage();
        msg.setObject(object);
        return msg;
    }

    public P createProducer(Destination destination) throws JMSException {
        return this.createProducerImpl(destination, null, null);
    }

    public P createProducer(Destination destination, boolean immediate) throws JMSException {
        return this.createProducerImpl(destination, null, immediate);
    }

    public P createProducer(Destination destination, boolean mandatory, boolean immediate) throws JMSException {
        return this.createProducerImpl(destination, mandatory, immediate);
    }

    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        this.checkNotClosed();
        return new TopicPublisherAdapter((BasicMessageProducer)this.createProducer((Destination)topic, false, false), topic);
    }

    public Queue createQueue(String queueName) throws JMSException {
        this.checkNotClosed();
        try {
            if (queueName.indexOf(47) == -1 && queueName.indexOf(59) == -1) {
                AMQDestination.DestSyntax syntax = AMQDestination.getDestType(queueName);
                if (syntax == AMQDestination.DestSyntax.BURL) {
                    return new AMQQueue(this.getDefaultQueueExchangeName(), new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName)));
                }
                AMQQueue queue = new AMQQueue(queueName);
                return queue;
            }
            return new AMQQueue(queueName);
        }
        catch (URISyntaxException urlse) {
            _logger.error("", (Throwable)urlse);
            JMSException jmse = new JMSException(urlse.getReason());
            jmse.setLinkedException((Exception)urlse);
            jmse.initCause((Throwable)urlse);
            throw jmse;
        }
    }

    public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException {
        this.createQueue(name, autoDelete, durable, exclusive, null);
    }

    public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException {
        new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>(){

            @Override
            public Object execute() throws AMQException, FailoverException {
                AMQSession.this.sendCreateQueue(name, autoDelete, durable, exclusive, arguments);
                return null;
            }
        }, this._connection).execute();
    }

    public abstract void sendCreateQueue(AMQShortString var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws AMQException, FailoverException;

    public QueueReceiver createQueueReceiver(Destination destination) throws JMSException {
        this.checkValidDestination(destination);
        Queue dest = this.validateQueue(destination);
        BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer((Destination)dest);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException {
        this.checkValidDestination(destination);
        Queue dest = this.validateQueue(destination);
        BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer((Destination)dest, messageSelector);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        this.checkNotClosed();
        Queue dest = this.validateQueue((Destination)queue);
        BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer((Destination)dest);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
        this.checkNotClosed();
        Queue dest = this.validateQueue((Destination)queue);
        BasicMessageConsumer consumer = (BasicMessageConsumer)this.createConsumer((Destination)dest, messageSelector);
        return new QueueReceiverAdaptor(dest, consumer);
    }

    private Queue validateQueue(Destination dest) throws InvalidDestinationException {
        if (dest instanceof AMQDestination && dest instanceof Queue) {
            return (Queue)dest;
        }
        throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue");
    }

    public QueueSender createSender(Queue queue) throws JMSException {
        this.checkNotClosed();
        return new QueueSenderAdapter((BasicMessageProducer)this.createProducer((Destination)queue), queue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamMessage createStreamMessage() throws JMSException {
        Object object = this.getFailoverMutex();
        synchronized (object) {
            this.checkNotClosed();
            JMSStreamMessage msg = new JMSStreamMessage(this.getMessageDelegateFactory());
            msg.setAMQSession(this);
            return msg;
        }
    }

    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        this.checkNotClosed();
        this.checkValidTopic(topic);
        return new TopicSubscriberAdaptor<C>(topic, this.createConsumerImpl((Destination)topic, this._prefetchHighMark, this._prefetchLowMark, false, true, null, null, false, false));
    }

    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
        this.checkNotClosed();
        this.checkValidTopic(topic);
        return new TopicSubscriberAdaptor<C>(topic, this.createConsumerImpl((Destination)topic, this._prefetchHighMark, this._prefetchLowMark, noLocal, true, messageSelector, null, false, false));
    }

    public TemporaryQueue createTemporaryQueue() throws JMSException {
        this.checkNotClosed();
        try {
            AMQTemporaryQueue result = new AMQTemporaryQueue(this);
            result.setQueueName(result.getRoutingKey());
            this.createQueue(result.getAMQQueueName(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
            this.bindQueue(result.getAMQQueueName(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(), result);
            return result;
        }
        catch (Exception e) {
            JMSException jmse = new JMSException("Cannot create temporary queue");
            jmse.setLinkedException(e);
            jmse.initCause((Throwable)e);
            throw jmse;
        }
    }

    public TemporaryTopic createTemporaryTopic() throws JMSException {
        this.checkNotClosed();
        return new AMQTemporaryTopic(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TextMessage createTextMessage() throws JMSException {
        Object object = this.getFailoverMutex();
        synchronized (object) {
            this.checkNotClosed();
            JMSTextMessage msg = new JMSTextMessage(this.getMessageDelegateFactory());
            msg.setAMQSession(this);
            return msg;
        }
    }

    protected Object getFailoverMutex() {
        return this._connection.getFailoverMutex();
    }

    public TextMessage createTextMessage(String text) throws JMSException {
        TextMessage msg = this.createTextMessage();
        msg.setText(text);
        return msg;
    }

    public Topic createTopic(String topicName) throws JMSException {
        this.checkNotClosed();
        try {
            if (topicName.indexOf(47) == -1 && topicName.indexOf(59) == -1) {
                AMQDestination.DestSyntax syntax = AMQDestination.getDestType(topicName);
                topicName = AMQDestination.stripSyntaxPrefix(topicName);
                if (syntax == AMQDestination.DestSyntax.BURL) {
                    return new AMQTopic(this.getDefaultTopicExchangeName(), new AMQShortString(topicName));
                }
                return new AMQTopic("ADDR:" + this.getDefaultTopicExchangeName() + "/" + topicName);
            }
            return new AMQTopic(topicName);
        }
        catch (URISyntaxException urlse) {
            _logger.error("", (Throwable)urlse);
            JMSException jmse = new JMSException(urlse.getReason());
            jmse.setLinkedException((Exception)urlse);
            jmse.initCause((Throwable)urlse);
            throw jmse;
        }
    }

    public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException {
        this.declareExchange(name, type, this.getProtocolHandler(), nowait);
    }

    public abstract void sync() throws AMQException;

    public int getAcknowledgeMode() {
        return this._acknowledgeMode;
    }

    public AMQConnection getAMQConnection() {
        return this._connection;
    }

    public int getChannelId() {
        return this._channelId;
    }

    @Override
    public int getDefaultPrefetch() {
        return this._prefetchHighMark;
    }

    @Override
    public int getDefaultPrefetchHigh() {
        return this._prefetchHighMark;
    }

    @Override
    public int getDefaultPrefetchLow() {
        return this._prefetchLowMark;
    }

    public int getPrefetch() {
        return this._prefetchHighMark;
    }

    @Override
    public AMQShortString getDefaultQueueExchangeName() {
        return this._connection.getDefaultQueueExchangeName();
    }

    @Override
    public AMQShortString getDefaultTopicExchangeName() {
        return this._connection.getDefaultTopicExchangeName();
    }

    public MessageListener getMessageListener() throws JMSException {
        return this._messageListener;
    }

    @Override
    public AMQShortString getTemporaryQueueExchangeName() {
        return this._connection.getTemporaryQueueExchangeName();
    }

    @Override
    public AMQShortString getTemporaryTopicExchangeName() {
        return this._connection.getTemporaryTopicExchangeName();
    }

    public int getTicket() {
        return this._ticket;
    }

    public boolean getTransacted() throws JMSException {
        this.checkNotClosed();
        return this._transacted;
    }

    public boolean isTransacted() {
        return this._transacted;
    }

    public boolean hasConsumer(Destination destination) {
        AtomicInteger counter = this._destinationConsumerCount.get(destination);
        return counter != null && counter.get() != 0;
    }

    public boolean isStrictAMQP() {
        return this._strictAMQP;
    }

    public boolean isSuspended() {
        return this._suspended;
    }

    protected void addUnacknowledgedMessage(long id) {
        this._unacknowledgedMessageTags.add(id);
    }

    protected void addDeliveredMessage(long id) {
        this._deliveredMessageTags.add(id);
    }

    public void messageReceived(UnprocessedMessage message) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("Message[" + message.toString() + "] received in session");
        }
        this._highestDeliveryTag.set(message.getDeliveryTag());
        this._queue.add(message);
    }

    public void declareAndBind(AMQDestination amqd) throws AMQException {
        AMQProtocolHandler protocolHandler = this.getProtocolHandler();
        this.declareExchange(amqd, protocolHandler, false);
        AMQShortString queueName = this.declareQueue(amqd, false);
        this.bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
    }

    public void recover() throws JMSException {
        this.checkNotClosed();
        this.checkNotTransacted();
        try {
            this.flushAcknowledgments();
            this._sessionInRecovery = true;
            boolean isSuspended = this.isSuspended();
            if (!isSuspended) {
                this.suspendChannel(true);
            }
            this._usingDispatcherForCleanup = true;
            this.syncDispatchQueue();
            this._usingDispatcherForCleanup = false;
            if (this._dispatcher != null) {
                this._dispatcher.recover();
            }
            this.sendRecover();
            this.markClean();
            if (!isSuspended) {
                this.suspendChannel(false);
            }
        }
        catch (AMQException e) {
            throw new JMSAMQException("Recover failed: " + e.getMessage(), (Exception)((Object)e));
        }
        catch (FailoverException e) {
            throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
        }
        catch (TransportException e) {
            throw this.toJMSException("Recover failed: " + e.getMessage(), e);
        }
    }

    protected abstract void sendRecover() throws AMQException, FailoverException;

    protected abstract void flushAcknowledgments();

    public void rejectMessage(UnprocessedMessage message, boolean requeue) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag());
        }
        this.rejectMessage(message.getDeliveryTag(), requeue);
    }

    public void rejectMessage(AbstractJMSMessage message, boolean requeue) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
        }
        this.rejectMessage(message.getDeliveryTag(), requeue);
    }

    public abstract void rejectMessage(long var1, boolean var3);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void rollback() throws JMSException {
        Object object = this._suspensionLock;
        synchronized (object) {
            this.checkTransacted();
            try {
                boolean isSuspended = this.isSuspended();
                if (!isSuspended) {
                    this.suspendChannel(true);
                }
                this.setRollbackMark();
                this.syncDispatchQueue();
                this._dispatcher.rollback();
                this.releaseForRollback();
                this.sendRollback();
                this.markClean();
                if (!isSuspended) {
                    this.suspendChannel(false);
                }
            }
            catch (AMQException e) {
                throw new JMSAMQException("Failed to rollback: " + (Object)((Object)e), (Exception)((Object)e));
            }
            catch (FailoverException e) {
                throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
            }
            catch (TransportException e) {
                throw this.toJMSException("Failure to rollback:" + e.getMessage(), e);
            }
        }
    }

    public abstract void releaseForRollback();

    public abstract void sendRollback() throws AMQException, FailoverException;

    public void run() {
        throw new UnsupportedOperationException();
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
    }

    public void unsubscribe(String name) throws JMSException {
        try {
            this.unsubscribe(name, false);
        }
        catch (TransportException e) {
            throw this.toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unsubscribe(String name, boolean safe) throws JMSException {
        TopicSubscriberAdaptor<C> subscriber;
        this._subscriberDetails.lock();
        try {
            this.checkNotClosed();
            subscriber = this._subscriptions.get(name);
            if (subscriber != null) {
                this._subscriptions.remove(name);
                this._reverseSubscriptionMap.remove(subscriber.getMessageConsumer());
            }
        }
        finally {
            this._subscriberDetails.unlock();
        }
        if (subscriber != null) {
            subscriber.close();
            this.deleteQueue(AMQTopic.getDurableTopicQueueName(name, this._connection));
        } else if (this._strictAMQP) {
            if (this._strictAMQPFATAL) {
                throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
            }
            _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." + " Requesting queue deletion regardless.");
            this.deleteQueue(AMQTopic.getDurableTopicQueueName(name, this._connection));
        } else if (this.isQueueBound(this.getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, this._connection))) {
            this.deleteQueue(AMQTopic.getDurableTopicQueueName(name, this._connection));
        } else if (!safe) {
            throw new InvalidDestinationException("Unknown subscription name: " + name);
        }
    }

    protected C createConsumerImpl(final Destination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException {
        String messageSelector;
        this.checkTemporaryDestination(destination);
        if (!noConsume && this.isBrowseOnlyDestination(destination)) {
            throw new InvalidDestinationException("The consumer being created is not 'noConsume',but a 'browseOnly' Destination has been supplied.");
        }
        if (this._strictAMQP && selector != null && !selector.equals("")) {
            if (this._strictAMQPFATAL) {
                throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
            }
            messageSelector = null;
        } else {
            messageSelector = selector;
        }
        return (C)((BasicMessageConsumer)new FailoverRetrySupport(new FailoverProtectedOperation<C, JMSException>(){

            @Override
            public C execute() throws JMSException, FailoverException {
                Object consumer;
                AMQSession.this.checkNotClosed();
                AMQDestination amqd = (AMQDestination)destination;
                try {
                    consumer = AMQSession.this.createMessageConsumer(amqd, prefetchHigh, prefetchLow, noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose);
                }
                catch (TransportException e) {
                    throw AMQSession.this.toJMSException("Exception while creating consumer: " + e.getMessage(), e);
                }
                if (AMQSession.this._messageListener != null) {
                    ((BasicMessageConsumer)consumer).setMessageListener(AMQSession.this._messageListener);
                }
                try {
                    AMQSession.this.registerConsumer(consumer, false);
                }
                catch (AMQInvalidArgumentException ise) {
                    InvalidSelectorException jmse = new InvalidSelectorException(ise.getMessage());
                    jmse.setLinkedException((Exception)((Object)ise));
                    jmse.initCause((Throwable)ise);
                    throw jmse;
                }
                catch (AMQInvalidRoutingKeyException e) {
                    InvalidDestinationException jmse = new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
                    jmse.setLinkedException((Exception)((Object)e));
                    jmse.initCause((Throwable)e);
                    throw jmse;
                }
                catch (AMQException e) {
                    if (e instanceof AMQChannelClosedException) {
                        AMQSession.this.close(-1L, false);
                    }
                    JMSException ex = new JMSException("Error registering consumer: " + (Object)((Object)e));
                    ex.setLinkedException((Exception)((Object)e));
                    ex.initCause((Throwable)e);
                    throw ex;
                }
                catch (TransportException e) {
                    throw AMQSession.this.toJMSException("Exception while registering consumer:" + e.getMessage(), e);
                }
                return consumer;
            }
        }, this._connection).execute());
    }

    public abstract C createMessageConsumer(AMQDestination var1, int var2, int var3, boolean var4, boolean var5, String var6, FieldTable var7, boolean var8, boolean var9) throws JMSException;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void deregisterConsumer(C consumer) {
        if (this._consumers.remove(((BasicMessageConsumer)consumer).getConsumerTag()) != null) {
            AMQDestination dest;
            this._subscriberAccess.lock();
            try {
                String subscriptionName = this._reverseSubscriptionMap.remove(consumer);
                if (subscriptionName != null) {
                    this._subscriptions.remove(subscriptionName);
                }
            }
            finally {
                this._subscriberAccess.unlock();
            }
            AMQDestination aMQDestination = dest = ((BasicMessageConsumer)consumer).getDestination();
            synchronized (aMQDestination) {
                if (this._destinationConsumerCount.get(dest) != null && this._destinationConsumerCount.get(dest).decrementAndGet() == 0) {
                    this._destinationConsumerCount.remove(dest);
                }
            }
            if (this._transacted) {
                this._removedConsumers.add(consumer);
            }
        }
    }

    void deregisterProducer(long producerId) {
        this._producers.remove(new Long(producerId));
    }

    boolean isInRecovery() {
        return this._sessionInRecovery;
    }

    boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException {
        return this.isQueueBound(exchangeName, queueName, null);
    }

    public abstract boolean isQueueBound(AMQShortString var1, AMQShortString var2, AMQShortString var3) throws JMSException;

    public abstract boolean isQueueBound(AMQDestination var1) throws JMSException;

    public abstract boolean isQueueBound(String var1, String var2, String var3, Map<String, Object> var4) throws JMSException;

    void markClosed() {
        this.setClosed();
        this._connection.deregisterSession(this._channelId);
        this.markClosedProducersAndConsumers();
    }

    void failoverPrep() {
        this.syncDispatchQueue();
    }

    void syncDispatchQueue() {
        if (Thread.currentThread() == this._dispatcherThread) {
            while (!super.isClosed() && !this._queue.isEmpty()) {
                Dispatchable disp;
                try {
                    disp = (Dispatchable)this._queue.take();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                if (disp == null) {
                    _logger.debug("_queue became empty during sync.");
                    break;
                }
                disp.dispatch(this);
            }
        } else {
            this.startDispatcherIfNecessary();
            final CountDownLatch signal = new CountDownLatch(1);
            this._queue.add(new Dispatchable(){

                public void dispatch(AMQSession ssn) {
                    signal.countDown();
                }
            });
            try {
                signal.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    void drainDispatchQueue() {
        if (Thread.currentThread() == this._dispatcherThread) {
            while (!super.isClosed() && !this._queue.isEmpty()) {
                Dispatchable disp;
                try {
                    disp = (Dispatchable)this._queue.take();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                if (disp == null) {
                    _logger.debug("_queue became empty during sync.");
                    break;
                }
                disp.dispatch(this);
            }
        } else {
            this.startDispatcherIfNecessary(false);
            final CountDownLatch signal = new CountDownLatch(1);
            this._queue.add(new Dispatchable(){

                public void dispatch(AMQSession ssn) {
                    signal.countDown();
                }
            });
            try {
                signal.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    void resubscribe() throws AMQException {
        if (this._dirty) {
            this._failedOverDirty = true;
        }
        this._rollbackMark.set(-1L);
        this.resubscribeProducers();
        this.resubscribeConsumers();
    }

    void setHasMessageListeners() {
        this._hasMessageListeners = true;
    }

    void setInRecovery(boolean inRecovery) {
        this._sessionInRecovery = inRecovery;
    }

    boolean isStarted() {
        return this._startedAtLeastOnce.get();
    }

    void start() throws AMQException {
        if (this._startedAtLeastOnce.getAndSet(true)) {
            this.suspendChannel(false);
        }
        if (this.hasMessageListeners()) {
            this.startDispatcherIfNecessary();
        }
    }

    void startDispatcherIfNecessary() {
        if (Thread.currentThread() == this._dispatcherThread) {
            return;
        }
        if (!this._immediatePrefetch && this.isSuspended() && this._startedAtLeastOnce.get() && this._firstDispatcher.getAndSet(false)) {
            try {
                this.suspendChannel(false);
            }
            catch (AMQException e) {
                _logger.info("Unsuspending channel threw an exception:", (Throwable)e);
            }
        }
        this.startDispatcherIfNecessary(false);
    }

    synchronized void startDispatcherIfNecessary(boolean initiallyStopped) {
        if (this._dispatcher == null) {
            this._dispatcher = new Dispatcher();
            try {
                this._dispatcherThread = Threading.getThreadFactory().createThread((Runnable)this._dispatcher);
            }
            catch (Exception e) {
                throw new Error("Error creating Dispatcher thread", e);
            }
            String dispatcherThreadName = "Dispatcher-" + this._channelId + "-Conn-" + this._connection.getConnectionNumber();
            this._dispatcherThread.setName(dispatcherThreadName);
            this._dispatcherThread.setDaemon(this.DEAMON_DISPATCHER_THREAD);
            this._dispatcher.setConnectionStopped(initiallyStopped);
            this._dispatcherThread.start();
            if (_dispatcherLogger.isDebugEnabled()) {
                _dispatcherLogger.debug(this._dispatcherThread.getName() + " created");
            }
        } else {
            this._dispatcher.setConnectionStopped(initiallyStopped);
        }
    }

    void stop() throws AMQException {
        this.suspendChannel(true);
        if (this._dispatcher != null) {
            this._dispatcher.setConnectionStopped(true);
        }
    }

    private void checkNotTransacted() throws JMSException {
        if (this.getTransacted()) {
            throw new javax.jms.IllegalStateException("Session is transacted");
        }
    }

    private void checkTemporaryDestination(Destination destination) throws JMSException {
        if (destination instanceof TemporaryDestination) {
            _logger.debug("destination is temporary");
            TemporaryDestination tempDest = (TemporaryDestination)destination;
            if (tempDest.getSession() != this) {
                _logger.debug("destination is on different session");
                throw new JMSException("Cannot consume from a temporary destination created on another session");
            }
            if (tempDest.isDeleted()) {
                _logger.debug("destination is deleted");
                throw new JMSException("Cannot consume from a deleted destination");
            }
        }
    }

    protected void checkTransacted() throws JMSException {
        if (!this.getTransacted()) {
            throw new javax.jms.IllegalStateException("Session is not transacted");
        }
    }

    private void checkValidDestination(Destination destination) throws InvalidDestinationException {
        if (destination == null) {
            throw new InvalidDestinationException("Invalid Queue");
        }
    }

    private void checkValidQueue(Queue queue) throws InvalidDestinationException {
        if (queue == null) {
            throw new InvalidDestinationException("Invalid Queue");
        }
    }

    protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException {
        if (topic == null) {
            throw new InvalidDestinationException("Invalid Topic");
        }
        if (topic instanceof TemporaryDestination && ((TemporaryDestination)topic).getSession() != this) {
            throw new InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
        }
        if (topic instanceof TemporaryDestination && durable) {
            throw new InvalidDestinationException("Cannot create a durable subscription with a temporary topic: " + topic);
        }
        if (!(topic instanceof AMQDestination) || !(topic instanceof Topic)) {
            throw new InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
        }
        return topic;
    }

    protected Topic checkValidTopic(Topic topic) throws JMSException {
        return this.checkValidTopic(topic, false);
    }

    private void closeConsumers(Throwable error) throws JMSException {
        ArrayList<C> clonedConsumers = new ArrayList<C>(this._consumers.values());
        for (BasicMessageConsumer con : clonedConsumers) {
            if (error != null) {
                con.notifyError(error);
                continue;
            }
            con.close(false);
        }
        if (this._dispatcher != null) {
            this._dispatcher.close();
            this._dispatcher = null;
        }
    }

    private void closeProducers() throws JMSException {
        ArrayList<MessageProducer> clonedProducers = new ArrayList<MessageProducer>(this._producers.values());
        for (BasicMessageProducer basicMessageProducer : clonedProducers) {
            basicMessageProducer.close();
        }
    }

    private void closeProducersAndConsumers(AMQException amqe) throws JMSException {
        JMSException jmse;
        block5: {
            jmse = null;
            try {
                this.closeProducers();
            }
            catch (JMSException e) {
                _logger.error("Error closing session: " + (Object)((Object)e), (Throwable)e);
                jmse = e;
            }
            try {
                this.closeConsumers(amqe);
            }
            catch (JMSException e) {
                _logger.error("Error closing session: " + (Object)((Object)e), (Throwable)e);
                if (jmse != null) break block5;
                jmse = e;
            }
        }
        if (jmse != null) {
            throw jmse;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void consumeFromQueue(C consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException {
        int tagId = this._nextTag++;
        ((BasicMessageConsumer)consumer).setConsumerTag(tagId);
        this._consumers.put(tagId, consumer);
        AMQDestination aMQDestination = ((BasicMessageConsumer)consumer).getDestination();
        synchronized (aMQDestination) {
            this._destinationConsumerCount.putIfAbsent(((BasicMessageConsumer)consumer).getDestination(), new AtomicInteger());
            this._destinationConsumerCount.get(((BasicMessageConsumer)consumer).getDestination()).incrementAndGet();
        }
        try {
            this.sendConsume(consumer, queueName, protocolHandler, nowait, tagId);
        }
        catch (AMQException e) {
            this._consumers.remove(tagId);
            throw e;
        }
    }

    public abstract void sendConsume(C var1, AMQShortString var2, AMQProtocolHandler var3, boolean var4, int var5) throws AMQException, FailoverException;

    private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) throws JMSException {
        return (P)((BasicMessageProducer)new FailoverRetrySupport(new FailoverProtectedOperation<P, JMSException>(){

            @Override
            public P execute() throws JMSException, FailoverException {
                Object producer;
                AMQSession.this.checkNotClosed();
                long producerId = AMQSession.this.getNextProducerId();
                try {
                    producer = AMQSession.this.createMessageProducer(destination, mandatory, immediate, producerId);
                }
                catch (TransportException e) {
                    throw AMQSession.this.toJMSException("Exception while creating producer:" + e.getMessage(), e);
                }
                AMQSession.this.registerProducer(producerId, producer);
                return producer;
            }
        }, this._connection).execute());
    }

    public abstract P createMessageProducer(Destination var1, Boolean var2, Boolean var3, long var4) throws JMSException;

    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException {
        this.declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
    }

    public long getQueueDepth(AMQDestination amqd) throws AMQException {
        return this.getQueueDepth(amqd, false);
    }

    public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException {
        return new FailoverNoopSupport<Long, AMQException>(new FailoverProtectedOperation<Long, AMQException>(){

            @Override
            public Long execute() throws AMQException, FailoverException {
                try {
                    return AMQSession.this.requestQueueDepth(amqd, sync);
                }
                catch (TransportException e) {
                    throw new AMQException(AMQConstant.getConstant((int)AMQSession.this.getErrorCode(e)), e.getMessage(), (Throwable)e);
                }
            }
        }, this._connection).execute();
    }

    protected abstract Long requestQueueDepth(AMQDestination var1, boolean var2) throws AMQException, FailoverException;

    private void declareExchange(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException {
        new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>(){

            @Override
            public Object execute() throws AMQException, FailoverException {
                AMQSession.this.sendExchangeDeclare(name, type, protocolHandler, nowait);
                return null;
            }
        }, this._connection).execute();
    }

    public abstract void sendExchangeDeclare(AMQShortString var1, AMQShortString var2, AMQProtocolHandler var3, boolean var4) throws AMQException, FailoverException;

    void declareQueuePassive(AMQDestination queue) throws AMQException {
        this.declareQueue(queue, false, false, true);
    }

    protected AMQShortString declareQueue(AMQDestination amqd, boolean noLocal) throws AMQException {
        return this.declareQueue(amqd, noLocal, false);
    }

    protected AMQShortString declareQueue(AMQDestination amqd, boolean noLocal, boolean nowait) throws AMQException {
        return this.declareQueue(amqd, noLocal, nowait, false);
    }

    protected AMQShortString declareQueue(final AMQDestination amqd, boolean noLocal, final boolean nowait, final boolean passive) throws AMQException {
        final AMQProtocolHandler protocolHandler = this.getProtocolHandler();
        return new FailoverNoopSupport<AMQShortString, AMQException>(new FailoverProtectedOperation<AMQShortString, AMQException>(){

            @Override
            public AMQShortString execute() throws AMQException, FailoverException {
                if (amqd.isNameRequired()) {
                    amqd.setQueueName(protocolHandler.generateQueueName());
                }
                AMQSession.this.sendQueueDeclare(amqd, protocolHandler, nowait, passive);
                return amqd.getAMQQueueName();
            }
        }, this._connection).execute();
    }

    public abstract void sendQueueDeclare(AMQDestination var1, AMQProtocolHandler var2, boolean var3, boolean var4) throws AMQException, FailoverException;

    protected void deleteQueue(final AMQShortString queueName) throws JMSException {
        try {
            new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>(){

                @Override
                public Object execute() throws AMQException, FailoverException {
                    AMQSession.this.sendQueueDelete(queueName);
                    return null;
                }
            }, this._connection).execute();
        }
        catch (AMQException e) {
            throw new JMSAMQException("The queue deletion failed: " + e.getMessage(), (Exception)((Object)e));
        }
    }

    protected void deleteTemporaryDestination(TemporaryDestination amqQueue) throws JMSException {
        this.deleteQueue(amqQueue.getAMQQueueName());
    }

    public abstract void sendQueueDelete(AMQShortString var1) throws AMQException, FailoverException;

    private long getNextProducerId() {
        return ++this._nextProducerId;
    }

    protected AMQProtocolHandler getProtocolHandler() {
        return this._connection.getProtocolHandler();
    }

    public byte getProtocolMajorVersion() {
        return this.getProtocolHandler().getProtocolMajorVersion();
    }

    public byte getProtocolMinorVersion() {
        return this.getProtocolHandler().getProtocolMinorVersion();
    }

    protected boolean hasMessageListeners() {
        return this._hasMessageListeners;
    }

    private void markClosedConsumers() throws JMSException {
        if (this._dispatcher != null) {
            this._dispatcher.close();
            this._dispatcher = null;
        }
        ArrayList<C> clonedConsumers = new ArrayList<C>(this._consumers.values());
        for (BasicMessageConsumer con : clonedConsumers) {
            con.markClosed();
        }
    }

    private void markClosedProducersAndConsumers() {
        try {
            this.closeProducers();
        }
        catch (JMSException e) {
            _logger.error("Error closing session: " + (Object)((Object)e), (Throwable)e);
        }
        try {
            this.markClosedConsumers();
        }
        catch (JMSException e) {
            _logger.error("Error closing session: " + (Object)((Object)e), (Throwable)e);
        }
    }

    private void registerConsumer(C consumer, boolean nowait) throws AMQException {
        AMQDestination amqd = ((BasicMessageConsumer)consumer).getDestination();
        AMQProtocolHandler protocolHandler = this.getProtocolHandler();
        if (amqd.getDestSyntax() == AMQDestination.DestSyntax.ADDR) {
            this.handleAddressBasedDestination(amqd, true, ((BasicMessageConsumer)consumer).isNoLocal(), nowait);
        } else {
            if (this._declareExchanges) {
                this.declareExchange(amqd, protocolHandler, nowait);
            }
            if (this._delareQueues || amqd.isNameRequired()) {
                this.declareQueue(amqd, ((BasicMessageConsumer)consumer).isNoLocal(), nowait);
            }
            this.bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), ((BasicMessageConsumer)consumer).getArguments(), amqd.getExchangeName(), amqd, nowait);
        }
        AMQShortString queueName = amqd.getAMQQueueName();
        ((BasicMessageConsumer)consumer).setQueuename(queueName);
        if (!this._immediatePrefetch) {
            if (this._dispatcher == null && !this.isSuspended()) {
                try {
                    this.suspendChannel(true);
                    _logger.debug("Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
                }
                catch (AMQException e) {
                    _logger.info("Suspending channel threw an exception:", (Throwable)e);
                }
            }
        } else {
            _logger.debug("Immediately prefetching existing messages to new consumer.");
        }
        try {
            this.consumeFromQueue(consumer, queueName, protocolHandler, nowait);
        }
        catch (FailoverException e) {
            throw new AMQException(null, "Fail-over exception interrupted basic consume.", (Throwable)e);
        }
    }

    public abstract void handleAddressBasedDestination(AMQDestination var1, boolean var2, boolean var3, boolean var4) throws AMQException;

    private void registerProducer(long producerId, MessageProducer producer) {
        this._producers.put(new Long(producerId), producer);
    }

    private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) {
        Iterator messages = this._queue.iterator();
        if (_logger.isDebugEnabled()) {
            _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" + requeue);
            if (messages.hasNext()) {
                _logger.debug("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
            } else {
                _logger.debug("No messages in _queue to reject");
            }
        }
        while (messages.hasNext()) {
            UnprocessedMessage message = (UnprocessedMessage)messages.next();
            if (!rejectAllConsumers && message.getConsumerTag() != consumerTag) continue;
            if (_logger.isDebugEnabled()) {
                _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" + message.getDeliveryTag());
            }
            messages.remove();
            this.rejectMessage(message, requeue);
            if (!_logger.isDebugEnabled()) continue;
            _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag);
        }
    }

    private void resubscribeConsumers() throws AMQException {
        ArrayList<C> consumers = new ArrayList<C>(this._consumers.values());
        this._consumers.clear();
        for (BasicMessageConsumer consumer : consumers) {
            consumer.failedOverPre();
            this.registerConsumer(consumer, true);
            consumer.failedOverPost();
        }
    }

    private void resubscribeProducers() throws AMQException {
        ArrayList<MessageProducer> producers = new ArrayList<MessageProducer>(this._producers.values());
        _logger.debug(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size()));
        for (BasicMessageProducer basicMessageProducer : producers) {
            basicMessageProducer.resubscribe();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void suspendChannel(boolean suspend) throws AMQException {
        Object object = this._suspensionLock;
        synchronized (object) {
            try {
                if (_logger.isDebugEnabled()) {
                    _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
                }
                this._suspended = suspend;
                this.sendSuspendChannel(suspend);
            }
            catch (FailoverException e) {
                throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", (Throwable)e);
            }
            catch (TransportException e) {
                throw new AMQException(AMQConstant.getConstant((int)this.getErrorCode(e)), e.getMessage(), (Throwable)e);
            }
        }
    }

    public abstract void sendSuspendChannel(boolean var1) throws AMQException, FailoverException;

    Object getMessageDeliveryLock() {
        return this._messageDeliveryLock;
    }

    public boolean prefetch() {
        return this._prefetchHighMark > 0;
    }

    public void markDirty() {
        this._dirty = true;
    }

    public void markClean() {
        this._dirty = false;
        this._failedOverDirty = false;
    }

    public boolean hasFailedOverDirty() {
        return this._failedOverDirty;
    }

    public void setTicket(int ticket) {
        this._ticket = ticket;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isFlowBlocked() {
        FlowControlIndicator flowControlIndicator = this._flowControl;
        synchronized (flowControlIndicator) {
            return !this._flowControl.getFlowControl();
        }
    }

    public void setFlowControl(boolean active) {
        this._flowControl.setFlowControl(active);
        if (_logger.isInfoEnabled()) {
            _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void checkFlowControl() throws InterruptedException, JMSException {
        long expiryTime = 0L;
        FlowControlIndicator flowControlIndicator = this._flowControl;
        synchronized (flowControlIndicator) {
            while (!this._flowControl.getFlowControl() && (expiryTime == 0L ? System.currentTimeMillis() + this._flowControlWaitFailure : expiryTime) >= System.currentTimeMillis()) {
                this._flowControl.wait(this._flowControlWaitPeriod);
                if (!_logger.isInfoEnabled()) continue;
                _logger.info("Message send delayed by " + (System.currentTimeMillis() + this._flowControlWaitFailure - expiryTime) / 1000L + "s due to broker enforced flow control");
            }
            if (!this._flowControl.getFlowControl()) {
                _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
                throw new JMSException("Unable to send message for " + this._flowControlWaitFailure / 1000L + " seconds due to broker enforced flow control");
            }
        }
    }

    public void dispatch(UnprocessedMessage message) {
        if (this._dispatcher == null) {
            throw new IllegalStateException("dispatcher is not started");
        }
        this._dispatcher.dispatchMessage(message);
    }

    protected abstract boolean tagLE(long var1, long var3);

    protected abstract boolean updateRollbackMark(long var1, long var3);

    public abstract AMQMessageDelegateFactory getMessageDelegateFactory();

    @Override
    public boolean isClosed() {
        return super.isClosed() || this._connection.isClosed();
    }

    @Override
    public boolean isClosing() {
        return super.isClosing() || this._connection.isClosing();
    }

    public boolean isDeclareExchanges() {
        return this._declareExchanges;
    }

    JMSException toJMSException(String message, TransportException e) {
        int code = this.getErrorCode(e);
        JMSException jmse = new JMSException(message, Integer.toString(code));
        jmse.setLinkedException((Exception)((Object)e));
        jmse.initCause((Throwable)e);
        return jmse;
    }

    private int getErrorCode(TransportException e) {
        SessionException se;
        int code = AMQConstant.INTERNAL_ERROR.getCode();
        if (e instanceof SessionException && (se = (SessionException)((Object)e)).getException() != null && se.getException().getErrorCode() != null) {
            code = se.getException().getErrorCode().getValue();
        }
        return code;
    }

    private boolean isBrowseOnlyDestination(Destination destination) {
        return destination instanceof AMQDestination && ((AMQDestination)destination).isBrowseOnly();
    }

    private void setRollbackMark() {
        this._rollbackMark.set(this._highestDeliveryTag.get());
        if (_logger.isDebugEnabled()) {
            _logger.debug("Rollback mark is set to " + this._rollbackMark.get());
        }
    }

    private class SuspenderRunner
    implements Runnable {
        private AtomicBoolean _suspend;

        public SuspenderRunner(AtomicBoolean suspend) {
            this._suspend = suspend;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            block6: {
                try {
                    Object object = AMQSession.this._suspensionLock;
                    synchronized (object) {
                        if (!AMQSession.this.isClosed() && !AMQSession.this.isClosing()) {
                            AMQSession.this.suspendChannel(this._suspend.get());
                        }
                    }
                }
                catch (AMQException e) {
                    _logger.warn("Unable to " + (this._suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: ", (Throwable)e);
                    if (!_logger.isDebugEnabled()) break block6;
                    _logger.debug("Is the _queue empty?" + AMQSession.this._queue.isEmpty());
                    _logger.debug("Is the dispatcher closed?" + (AMQSession.this._dispatcher == null ? "it's Null" : AMQSession.this._dispatcher.getClosed()));
                }
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class Dispatcher
    implements Runnable {
        private final AtomicBoolean _closed = new AtomicBoolean(false);
        private final Object _lock = new Object();
        private final String dispatcherID = "" + System.identityHashCode(this);

        public void close() {
            this._closed.set(true);
            AMQSession.this._dispatcherThread.interrupt();
        }

        private AtomicBoolean getClosed() {
            return this._closed;
        }

        public void rejectPending(C consumer) {
            ((BasicMessageConsumer)consumer).rollbackPendingMessages();
            AMQSession.this.rejectMessagesForConsumerTag(((BasicMessageConsumer)consumer).getConsumerTag(), true, false);
            ((BasicMessageConsumer)consumer).markClosed();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void rollback() {
            Object object = this._lock;
            synchronized (object) {
                boolean isStopped = this.connectionStopped();
                if (!isStopped) {
                    this.setConnectionStopped(true);
                }
                AMQSession.this.setRollbackMark();
                _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
                for (BasicMessageConsumer consumer : AMQSession.this._consumers.values()) {
                    if (!consumer.isBrowseOnly()) {
                        consumer.rollback();
                        continue;
                    }
                    consumer.clearReceiveQueue();
                }
                for (int i = 0; i < AMQSession.this._removedConsumers.size(); ++i) {
                    ((BasicMessageConsumer)AMQSession.this._removedConsumers.get(i)).rollback();
                    AMQSession.this._removedConsumers.remove(i);
                }
                this.setConnectionStopped(isStopped);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void recover() {
            Object object = this._lock;
            synchronized (object) {
                boolean isStopped = this.connectionStopped();
                if (!isStopped) {
                    this.setConnectionStopped(true);
                }
                _dispatcherLogger.debug("Session clearing the consumer queues");
                for (BasicMessageConsumer consumer : AMQSession.this._consumers.values()) {
                    List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
                    AMQSession.this._prefetchedMessageTags.addAll(tags);
                }
                this.setConnectionStopped(isStopped);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (_dispatcherLogger.isDebugEnabled()) {
                _dispatcherLogger.debug(AMQSession.this._dispatcherThread.getName() + " started");
            }
            Object object = this._lock;
            synchronized (object) {
                while (!this._closed.get() && this.connectionStopped()) {
                    try {
                        this._lock.wait();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            try {
                Dispatchable disp;
                while ((disp = (Dispatchable)AMQSession.this._queue.take()) != null && !this._closed.get()) {
                    disp.dispatch(AMQSession.this);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (_dispatcherLogger.isDebugEnabled()) {
                _dispatcherLogger.debug(AMQSession.this._dispatcherThread.getName() + " thread terminating for channel " + AMQSession.this._channelId + ":" + AMQSession.this);
            }
        }

        final boolean connectionStopped() {
            return AMQSession.this._connectionStopped;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean setConnectionStopped(boolean connectionStopped) {
            boolean currently;
            Object object = this._lock;
            synchronized (object) {
                currently = AMQSession.this._connectionStopped;
                AMQSession.this._connectionStopped = connectionStopped;
                this._lock.notify();
                if (_dispatcherLogger.isDebugEnabled()) {
                    _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") + ": Currently " + (currently ? "Stopped" : "Started"));
                }
            }
            return currently;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void dispatchMessage(UnprocessedMessage message) {
            long deliveryTag = message.getDeliveryTag();
            Object object = this._lock;
            synchronized (object) {
                try {
                    while (this.connectionStopped()) {
                        this._lock.wait();
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (!(message instanceof CloseConsumerMessage) && AMQSession.this.tagLE(deliveryTag, AMQSession.this._rollbackMark.get())) {
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Rejecting message because delivery tag " + deliveryTag + " <= rollback mark " + AMQSession.this._rollbackMark.get());
                    }
                    AMQSession.this.rejectMessage(message, true);
                } else if (AMQSession.this._usingDispatcherForCleanup) {
                    AMQSession.this._prefetchedMessageTags.add(deliveryTag);
                } else {
                    Object object2 = AMQSession.this._messageDeliveryLock;
                    synchronized (object2) {
                        this.notifyConsumer(message);
                    }
                }
            }
            long current = AMQSession.this._rollbackMark.get();
            if (AMQSession.this.updateRollbackMark(current, deliveryTag)) {
                AMQSession.this._rollbackMark.compareAndSet(current, deliveryTag);
            }
        }

        private void notifyConsumer(UnprocessedMessage message) {
            Object consumer = AMQSession.this._consumers.get(message.getConsumerTag());
            if (consumer == null || ((Closeable)consumer).isClosed() || ((Closeable)consumer).isClosing()) {
                if (_dispatcherLogger.isInfoEnabled()) {
                    if (consumer == null) {
                        _dispatcherLogger.info("Dispatcher(" + this.dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" + message.getDeliveryTag() + "] from queue " + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
                    } else {
                        if (((BasicMessageConsumer)consumer).isBrowseOnly()) {
                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + message.getDeliveryTag() + "] from queue " + " consumer(" + message.getConsumerTag() + ") is closed and a browser so dropping...");
                            return;
                        }
                        _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + message.getDeliveryTag() + "] from queue " + " consumer(" + message.getConsumerTag() + ") is closed rejecting(requeue)...");
                    }
                }
                if (!this._closed.get()) {
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag() + " for closing consumer " + String.valueOf(consumer == null ? null : Integer.valueOf(((BasicMessageConsumer)consumer).getConsumerTag())));
                    }
                    AMQSession.this.rejectMessage(message, true);
                }
            } else {
                ((BasicMessageConsumer)consumer).notifyMessage((UnprocessedMessage)message);
            }
        }
    }

    public static interface Dispatchable {
        public void dispatch(AMQSession var1);
    }

    private static final class FlowControlIndicator {
        private volatile boolean _flowControl = true;

        private FlowControlIndicator() {
        }

        public synchronized void setFlowControl(boolean flowControl) {
            this._flowControl = flowControl;
            this.notify();
        }

        public boolean getFlowControl() {
            return this._flowControl;
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static final class IdToConsumerMap<C extends BasicMessageConsumer> {
        private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
        private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap();

        public C get(int id) {
            if ((id & 0xFFFFFFF0) == 0) {
                return (C)this._fastAccessConsumers[id];
            }
            return (C)((BasicMessageConsumer)this._slowAccessConsumers.get(id));
        }

        public C put(int id, C consumer) {
            BasicMessageConsumer oldVal;
            if ((id & 0xFFFFFFF0) == 0) {
                oldVal = this._fastAccessConsumers[id];
                this._fastAccessConsumers[id] = consumer;
            } else {
                oldVal = (BasicMessageConsumer)this._slowAccessConsumers.put(id, consumer);
            }
            return (C)oldVal;
        }

        public C remove(int id) {
            BasicMessageConsumer consumer;
            if ((id & 0xFFFFFFF0) == 0) {
                consumer = this._fastAccessConsumers[id];
                this._fastAccessConsumers[id] = null;
            } else {
                consumer = (BasicMessageConsumer)this._slowAccessConsumers.remove(id);
            }
            return (C)consumer;
        }

        public Collection<C> values() {
            ArrayList<BasicMessageConsumer<Object>> values = new ArrayList<BasicMessageConsumer<Object>>();
            for (int i = 0; i < 16; ++i) {
                if (this._fastAccessConsumers[i] == null) continue;
                values.add(this._fastAccessConsumers[i]);
            }
            values.addAll(this._slowAccessConsumers.values());
            return values;
        }

        public void clear() {
            this._slowAccessConsumers.clear();
            for (int i = 0; i < 16; ++i) {
                this._fastAccessConsumers[i] = null;
            }
        }
    }
}

