/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.BasicMessageConsumer_0_10;
import org.apache.qpid.client.BasicMessageProducer_0_10;
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.ExchangeQueryResult;
import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.QueueQueryResult;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.util.Serial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AMQSession_0_10
extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
implements SessionListener {
    private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
    private static Timer timer = new Timer("ack-flusher", true);
    private Session _qpidSession;
    private Object _currentExceptionLock = new Object();
    private AMQException _currentException;
    protected Connection _qpidConnection;
    private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000L);
    private TimerTask flushTask = null;
    private RangeSet unacked = new RangeSet();
    private int unackedCount = 0;
    private RangeSet _txRangeSet = new RangeSet();
    private int _txSize = 0;

    AMQSession_0_10(Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) {
        super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark);
        this._qpidConnection = qpidConnection;
        this._qpidSession = this._qpidConnection.createSession(1L);
        this._qpidSession.setSessionListener((SessionListener)this);
        if (this._transacted) {
            this._qpidSession.txSelect(new Option[0]);
            this._qpidSession.setTransacted(true);
        }
        if (this.maxAckDelay > 0L) {
            this.flushTask = new Flusher(this);
            timer.schedule(this.flushTask, new Date(), this.maxAckDelay);
        }
    }

    AMQSession_0_10(Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) {
        this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addUnacked(int id) {
        RangeSet rangeSet = this.unacked;
        synchronized (rangeSet) {
            this.unacked.add(id);
            ++this.unackedCount;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearUnacked() {
        RangeSet rangeSet = this.unacked;
        synchronized (rangeSet) {
            this.unacked.clear();
            this.unackedCount = 0;
        }
    }

    @Override
    void failoverPrep() {
        super.failoverPrep();
        this.clearUnacked();
    }

    @Override
    public void acknowledgeMessage(long deliveryTag, boolean multiple) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + this._channelId);
        }
        if (multiple) {
            for (Long messageTag : this._unacknowledgedMessageTags) {
                if (messageTag > deliveryTag) continue;
                this.addUnacked(messageTag.intValue());
                this._unacknowledgedMessageTags.remove(messageTag);
            }
        } else {
            this.addUnacked((int)deliveryTag);
            this._unacknowledgedMessageTags.remove(deliveryTag);
        }
        long prefetch = this.getAMQConnection().getMaxPrefetch();
        if ((long)this.unackedCount >= prefetch / 2L || this.maxAckDelay <= 0L) {
            this.flushAcknowledgments();
        }
    }

    @Override
    protected void flushAcknowledgments() {
        this.flushAcknowledgments(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void flushAcknowledgments(boolean setSyncBit) {
        RangeSet rangeSet = this.unacked;
        synchronized (rangeSet) {
            if (this.unackedCount > 0) {
                this.messageAcknowledge(this.unacked, this._acknowledgeMode != 257, setSyncBit);
                this.clearUnacked();
            }
        }
    }

    void messageAcknowledge(RangeSet ranges, boolean accept) {
        this.messageAcknowledge(ranges, accept, false);
    }

    void messageAcknowledge(RangeSet ranges, boolean accept, boolean setSyncBit) {
        Session ssn = this.getQpidSession();
        for (Range range : ranges) {
            ssn.processed(range);
        }
        ssn.flushProcessed(new Option[]{accept ? Option.BATCH : Option.NONE});
        if (accept) {
            ssn.messageAccept(ranges, new Option[]{Option.UNRELIABLE, setSyncBit ? Option.SYNC : Option.NONE});
        }
    }

    @Override
    public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination destination, boolean nowait) throws AMQException, FailoverException {
        if (destination.getDestSyntax() == AMQDestination.DestSyntax.BURL) {
            Map<String, Object> args = FieldTableSupport.convertToMap(arguments);
            for (AMQShortString rk : destination.getBindingKeys()) {
                _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
                this.getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args, new Option[0]);
            }
        } else {
            ArrayList<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>();
            bindings.addAll(destination.getSourceNode().getBindings());
            bindings.addAll(destination.getTargetNode().getBindings());
            String defaultExchange = destination.getAddressType() == 2 ? destination.getAddressName() : "amq.topic";
            for (AMQDestination.Binding binding : bindings) {
                String queue = binding.getQueue() == null ? queueName.asString() : binding.getQueue();
                String exchange = binding.getExchange() == null ? defaultExchange : binding.getExchange();
                _logger.debug("Binding queue : " + queue + " exchange: " + exchange + " using binding key " + binding.getBindingKey() + " with args " + this.printMap(binding.getArgs()));
                this.getQpidSession().exchangeBind(queue, exchange, binding.getBindingKey(), binding.getArgs(), new Option[0]);
            }
        }
        if (!nowait) {
            this.sync();
        }
    }

    @Override
    public void sendClose(long timeout) throws AMQException, FailoverException {
        if (this.flushTask != null) {
            this.flushTask.cancel();
            this.flushTask = null;
        }
        this.flushAcknowledgments();
        try {
            this.getQpidSession().sync();
            this.getQpidSession().close();
        }
        catch (SessionException se) {
            this.setCurrentException(se);
        }
        AMQException amqe = this.getCurrentException();
        if (amqe != null) {
            throw amqe;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendCommit() throws AMQException, FailoverException {
        this.getQpidSession().setAutoSync(true);
        try {
            this.getQpidSession().txCommit(new Option[0]);
        }
        finally {
            this.getQpidSession().setAutoSync(false);
        }
        this.sync();
    }

    @Override
    public void sendCreateQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException {
        this.getQpidSession().queueDeclare(name.toString(), null, arguments, new Option[]{durable ? Option.DURABLE : Option.NONE, autoDelete ? Option.AUTO_DELETE : Option.NONE, exclusive ? Option.EXCLUSIVE : Option.NONE});
        this.sync();
    }

    @Override
    public void sendRecover() throws AMQException, FailoverException {
        Long tag;
        RangeSet ranges = new RangeSet();
        while ((tag = (Long)this._unacknowledgedMessageTags.poll()) != null) {
            ranges.add((int)tag.longValue());
        }
        this.getQpidSession().messageRelease(ranges, new Option[]{Option.SET_REDELIVERED});
        this.sync();
    }

    @Override
    public void releaseForRollback() {
        this.getQpidSession().messageRelease(this._txRangeSet, new Option[]{Option.SET_REDELIVERED});
        this._txRangeSet.clear();
        this._txSize = 0;
    }

    @Override
    public void rejectMessage(long deliveryTag, boolean requeue) {
        RangeSet ranges = new RangeSet();
        ranges.add((int)deliveryTag);
        this.getQpidSession().messageRelease(ranges, new Option[]{Option.SET_REDELIVERED});
    }

    @Override
    public BasicMessageConsumer_0_10 createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String messageSelector, FieldTable ft, boolean noConsume, boolean autoClose) throws JMSException {
        AMQProtocolHandler protocolHandler = this.getProtocolHandler();
        return new BasicMessageConsumer_0_10(this._channelId, this._connection, destination, messageSelector, noLocal, this._messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, this._acknowledgeMode, noConsume, autoClose);
    }

    @Override
    public boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException {
        return this.isQueueBound(exchangeName, queueName, routingKey, null);
    }

    @Override
    public boolean isQueueBound(AMQDestination destination) throws JMSException {
        return this.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(), destination.getRoutingKey(), destination.getBindingKeys());
    }

    public boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, AMQShortString[] bindingKeys) throws JMSException {
        String rk = null;
        if (bindingKeys != null && bindingKeys.length > 0) {
            rk = bindingKeys[0].toString();
        } else if (routingKey != null) {
            rk = routingKey.toString();
        }
        return this.isQueueBound(exchangeName.toString(), queueName.toString(), rk, null);
    }

    @Override
    public boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String, Object> args) throws JMSException {
        ExchangeBoundResult bindingQueryResult = (ExchangeBoundResult)this.getQpidSession().exchangeBound(exchangeName, queueName, bindingKey, args, new Option[0]).get();
        boolean res = bindingKey == null ? !bindingQueryResult.getExchangeNotFound() && !bindingQueryResult.getQueueNotFound() : (args == null ? !bindingQueryResult.getKeyNotMatched() && !bindingQueryResult.getQueueNotFound() && !bindingQueryResult.getQueueNotMatched() : !bindingQueryResult.getKeyNotMatched() && !bindingQueryResult.getQueueNotFound() && !bindingQueryResult.getQueueNotMatched() && !bindingQueryResult.getArgsNotMatched());
        return res;
    }

    @Override
    public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException {
        long capacity = this.getCapacity(consumer.getDestination());
        try {
            boolean acceptModeNone;
            boolean preAcquire;
            boolean isTopic;
            Map arguments = FieldTable.convertToMap((FieldTable)consumer.getArguments());
            if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) {
                isTopic = consumer.getDestination() instanceof AMQTopic || consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
                preAcquire = isTopic || !consumer.isNoConsume() && (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals(""));
            } else {
                isTopic = consumer.getDestination().getAddressType() == 2;
                preAcquire = !consumer.isNoConsume() && (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals(""));
                arguments.putAll(consumer.getDestination().getLink().getSubscription().getArgs());
            }
            boolean bl = acceptModeNone = this.getAcknowledgeMode() == 257;
            if (consumer.getDestination().getLink() != null) {
                acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
            }
            this.getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag), acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0L, arguments, new Option[]{consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE});
        }
        catch (JMSException e) {
            throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", (Throwable)e);
        }
        String consumerTag = consumer.getConsumerTagString();
        if (capacity == 0L) {
            this.getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT, new Option[0]);
        } else {
            this.getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW, new Option[0]);
        }
        this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, -1L, new Option[]{Option.UNRELIABLE});
        if (capacity > 0L && this._dispatcher != null && (this.isStarted() || this._immediatePrefetch)) {
            this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, capacity, new Option[]{Option.UNRELIABLE});
        }
        if (!nowait) {
            this.sync();
        }
    }

    private long getCapacity(AMQDestination destination) {
        long capacity = 0L;
        if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && destination.getLink().getConsumerCapacity() > 0) {
            capacity = destination.getLink().getConsumerCapacity();
        } else if (this.prefetch()) {
            capacity = this.getAMQConnection().getMaxPrefetch();
        }
        return capacity;
    }

    @Override
    public BasicMessageProducer_0_10 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId) throws JMSException {
        try {
            return new BasicMessageProducer_0_10(this._connection, (AMQDestination)destination, this._transacted, this._channelId, this, this.getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
        }
        catch (AMQException e) {
            JMSException ex = new JMSException("Error creating producer");
            ex.initCause((Throwable)e);
            ex.setLinkedException((Exception)((Object)e));
            throw ex;
        }
    }

    @Override
    public void sendExchangeDeclare(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException {
        this.sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait);
    }

    public void sendExchangeDeclare(String name, String type, String alternateExchange, Map<String, Object> args, boolean nowait) throws AMQException {
        this.getQpidSession().exchangeDeclare(name, type, alternateExchange, args, new Option[]{name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE});
        if (!nowait) {
            this.sync();
        }
    }

    public void sendExchangeDelete(String name, boolean nowait) throws AMQException, FailoverException {
        this.getQpidSession().exchangeDelete(name, new Option[0]);
        if (!nowait) {
            this.sync();
        }
    }

    @Override
    public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException {
    }

    public AMQShortString send0_10QueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean noLocal, boolean nowait) throws AMQException {
        AMQShortString queueName;
        if (amqd.getAMQQueueName() == null) {
            queueName = new AMQShortString("TempQueue" + UUID.randomUUID());
            amqd.setQueueName(queueName);
        } else {
            queueName = amqd.getAMQQueueName();
        }
        if (amqd.getDestSyntax() == AMQDestination.DestSyntax.BURL) {
            HashMap<String, Boolean> arguments = new HashMap<String, Boolean>();
            if (noLocal) {
                arguments.put("no-local", true);
            }
            this.getQpidSession().queueDeclare(queueName.toString(), "", arguments, new Option[]{amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE});
        } else {
            Node.QueueNode node = (Node.QueueNode)amqd.getSourceNode();
            this.getQpidSession().queueDeclare(queueName.toString(), "", node.getDeclareArgs(), new Option[]{node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE});
        }
        if (!nowait) {
            this.sync();
        }
        return queueName;
    }

    @Override
    public void sendQueueDelete(AMQShortString queueName) throws AMQException, FailoverException {
        this.getQpidSession().queueDelete(queueName.toString(), new Option[0]);
        this.sync();
    }

    @Override
    public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException {
        if (suspend) {
            for (BasicMessageConsumer_0_10 consumer : this._consumers.values()) {
                this.getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), new Option[]{Option.UNRELIABLE});
            }
        } else {
            for (BasicMessageConsumer_0_10 consumer : this._consumers.values()) {
                String consumerTag = String.valueOf(consumer.getConsumerTag());
                try {
                    long capacity = this.getCapacity(consumer.getDestination());
                    if (capacity == 0L) {
                        if (consumer.getMessageListener() != null) {
                            this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, 1L, new Option[]{Option.UNRELIABLE});
                        }
                    } else {
                        this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, capacity, new Option[]{Option.UNRELIABLE});
                    }
                    this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, -1L, new Option[]{Option.UNRELIABLE});
                }
                catch (Exception e) {
                    throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", (Throwable)e);
                }
            }
        }
        this.sync();
    }

    @Override
    public void sendRollback() throws AMQException, FailoverException {
        this.getQpidSession().txRollback(new Option[0]);
        this.sync();
    }

    protected Session getQpidSession() {
        return this._qpidSession;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AMQException getCurrentException() {
        AMQException amqe = null;
        Object object = this._currentExceptionLock;
        synchronized (object) {
            if (this._currentException != null) {
                amqe = this._currentException;
                this._currentException = null;
            }
        }
        return amqe;
    }

    public void opened(Session ssn) {
    }

    public void resumed(Session ssn) {
        this._qpidConnection = ssn.getConnection();
    }

    public void message(Session ssn, MessageTransfer xfr) {
        this.messageReceived(new UnprocessedMessage_0_10(xfr));
    }

    public void exception(Session ssn, SessionException exc) {
        this.setCurrentException(exc);
    }

    public void closed(Session ssn) {
    }

    @Override
    protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, final boolean noLocal, final boolean nowait) throws AMQException {
        return new FailoverNoopSupport<AMQShortString, AMQException>(new FailoverProtectedOperation<AMQShortString, AMQException>(){

            @Override
            public AMQShortString execute() throws AMQException, FailoverException {
                if (amqd.isNameRequired()) {
                    String binddingKey = "";
                    for (AMQShortString key : amqd.getBindingKeys()) {
                        binddingKey = binddingKey + "_" + key.toString();
                    }
                    amqd.setQueueName(new AMQShortString(binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
                }
                return AMQSession_0_10.this.send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
            }
        }, this._connection).execute();
    }

    @Override
    protected Long requestQueueDepth(AMQDestination amqd) {
        this.flushAcknowledgments();
        return ((QueueQueryResult)this.getQpidSession().queueQuery(amqd.getQueueName(), new Option[0]).get()).getMessageCount();
    }

    @Override
    protected void addDeliveredMessage(long id) {
        this._txRangeSet.add((int)id);
        ++this._txSize;
        if (this._connection.getMaxPrefetch() == 1L || this._connection.getMaxPrefetch() != 0L && (long)this._txSize % (this._connection.getMaxPrefetch() / 2L) == 0L) {
            this.messageAcknowledge(this._txRangeSet, false);
        }
    }

    @Override
    public void commit() throws JMSException {
        this.checkTransacted();
        try {
            if (this._txSize > 0) {
                this.messageAcknowledge(this._txRangeSet, true);
                this._txRangeSet.clear();
                this._txSize = 0;
            }
            this.sendCommit();
        }
        catch (AMQException e) {
            throw new JMSAMQException("Failed to commit: " + e.getMessage(), (Exception)((Object)e));
        }
        catch (FailoverException e) {
            throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
        }
    }

    @Override
    protected final boolean tagLE(long tag1, long tag2) {
        return Serial.le((int)((int)tag1), (int)((int)tag2));
    }

    @Override
    protected final boolean updateRollbackMark(long currentMark, long deliveryTag) {
        return Serial.lt((int)((int)currentMark), (int)((int)deliveryTag));
    }

    @Override
    public void sync() throws AMQException {
        try {
            this.getQpidSession().sync();
        }
        catch (SessionException se) {
            this.setCurrentException(se);
        }
        AMQException amqe = this.getCurrentException();
        if (amqe != null) {
            throw amqe;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setCurrentException(SessionException se) {
        Object object = this._currentExceptionLock;
        synchronized (object) {
            ExecutionException ee = se.getException();
            int code = AMQConstant.INTERNAL_ERROR.getCode();
            if (ee != null) {
                code = ee.getErrorCode().getValue();
            }
            AMQException amqe = new AMQException(AMQConstant.getConstant((int)code), se.getMessage(), se.getCause());
            this._connection.exceptionReceived(amqe);
            this._currentException = amqe;
        }
    }

    @Override
    public AMQMessageDelegateFactory getMessageDelegateFactory() {
        return AMQMessageDelegateFactory.FACTORY_0_10;
    }

    public boolean isExchangeExist(AMQDestination dest, Node.ExchangeNode node, boolean assertNode) {
        boolean match = true;
        ExchangeQueryResult result = (ExchangeQueryResult)this.getQpidSession().exchangeQuery(dest.getAddressName(), new Option[]{Option.NONE}).get();
        boolean bl = match = !result.getNotFound();
        if (match) {
            if (assertNode) {
                match = result.getDurable() == node.isDurable() && node.getExchangeType() != null && node.getExchangeType().equals(result.getType()) && this.matchProps(result.getArguments(), node.getDeclareArgs());
            } else if (node.getExchangeType() != null) {
                match = node.getExchangeType().equals(result.getType());
                if (!match) {
                    _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() + " actual " + result.getType());
                }
            } else {
                _logger.debug("Setting Exchange type " + result.getType());
                node.setExchangeType(result.getType());
                dest.setExchangeClass(new AMQShortString(result.getType()));
            }
        }
        return match;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public boolean isQueueExist(AMQDestination dest, Node.QueueNode node, boolean assertNode) throws AMQException {
        boolean match = true;
        try {
            QueueQueryResult result = (QueueQueryResult)this.getQpidSession().queueQuery(dest.getAddressName(), new Option[]{Option.NONE}).get();
            match = dest.getAddressName().equals(result.getQueue());
            if (match && assertNode) {
                if (result.getDurable() != node.isDurable()) return false;
                if (result.getAutoDelete() != node.isAutoDelete()) return false;
                if (result.getExclusive() != node.isExclusive()) return false;
                if (!this.matchProps(result.getArguments(), node.getDeclareArgs())) return false;
                return true;
            }
            if (!match) return match;
        }
        catch (SessionException e) {
            if (e.getException().getErrorCode() != ExecutionErrorCode.RESOURCE_DELETED) throw new AMQException(AMQConstant.getConstant((int)e.getException().getErrorCode().getValue()), "Error querying queue", (Throwable)e);
            return false;
        }
        return match;
    }

    private boolean matchProps(Map<String, Object> target, Map<String, Object> source) {
        boolean match = true;
        for (String key : source.keySet()) {
            match = target.containsKey(key) && target.get(key).equals(source.get(key));
            if (match) continue;
            StringBuffer buf = new StringBuffer();
            buf.append("Property given in address did not match with the args sent by the broker.");
            buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
            buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }");
            _logger.debug(buf.toString());
            return match;
        }
        return match;
    }

    @Override
    public void handleAddressBasedDestination(AMQDestination dest, boolean isConsumer, boolean noWait) throws AMQException {
        if (dest.isAddressResolved()) {
            if (isConsumer && 2 == dest.getAddressType()) {
                this.createSubscriptionQueue(dest);
            }
        } else {
            boolean assertNode = dest.getAssert() == AMQDestination.AddressOption.ALWAYS || isConsumer && dest.getAssert() == AMQDestination.AddressOption.RECEIVER || !isConsumer && dest.getAssert() == AMQDestination.AddressOption.SENDER;
            boolean createNode = dest.getCreate() == AMQDestination.AddressOption.ALWAYS || isConsumer && dest.getCreate() == AMQDestination.AddressOption.RECEIVER || !isConsumer && dest.getCreate() == AMQDestination.AddressOption.SENDER;
            int type = this.resolveAddressType(dest);
            if (type == 1 && dest.getLink().getReliability() == Link.Reliability.UNSPECIFIED) {
                dest.getLink().setReliability(Link.Reliability.AT_LEAST_ONCE);
            } else if (type == 2 && dest.getLink().getReliability() == Link.Reliability.UNSPECIFIED) {
                dest.getLink().setReliability(Link.Reliability.UNRELIABLE);
            } else if (type == 2 && dest.getLink().getReliability() == Link.Reliability.AT_LEAST_ONCE) {
                throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");
            }
            switch (type) {
                case 1: {
                    if (this.isQueueExist(dest, (Node.QueueNode)dest.getSourceNode(), assertNode)) {
                        this.setLegacyFiledsForQueueType(dest);
                        break;
                    }
                    if (createNode) {
                        this.setLegacyFiledsForQueueType(dest);
                        this.send0_10QueueDeclare(dest, null, false, noWait);
                        break;
                    }
                }
                case 2: {
                    if (this.isExchangeExist(dest, (Node.ExchangeNode)dest.getTargetNode(), assertNode)) {
                        this.setLegacyFiledsForTopicType(dest);
                        this.verifySubject(dest);
                        if (!isConsumer || this.isQueueExist(dest, (Node.QueueNode)dest.getSourceNode(), true)) break;
                        this.createSubscriptionQueue(dest);
                        break;
                    }
                    if (createNode) {
                        this.setLegacyFiledsForTopicType(dest);
                        this.verifySubject(dest);
                        this.sendExchangeDeclare(dest.getAddressName(), dest.getExchangeClass().asString(), dest.getTargetNode().getAlternateExchange(), dest.getTargetNode().getDeclareArgs(), false);
                        if (!isConsumer || this.isQueueExist(dest, (Node.QueueNode)dest.getSourceNode(), true)) break;
                        this.createSubscriptionQueue(dest);
                        break;
                    }
                }
                default: {
                    throw new AMQException("The name '" + dest.getAddressName() + "' supplied in the address doesn't resolve to an exchange or a queue");
                }
            }
            dest.setAddressResolved(true);
        }
    }

    public int resolveAddressType(AMQDestination dest) throws AMQException {
        int type = dest.getAddressType();
        String name = dest.getAddressName();
        if (type != 3) {
            return type;
        }
        ExchangeBoundResult result = (ExchangeBoundResult)this.getQpidSession().exchangeBound(name, name, null, null, new Option[0]).get();
        if (result.getQueueNotFound() && result.getExchangeNotFound()) {
            type = 1;
        } else if (result.getExchangeNotFound()) {
            type = 1;
        } else if (result.getQueueNotFound()) {
            type = 2;
        } else {
            throw new AMQException("Ambiguous address, please specify queue or topic as node type");
        }
        dest.setAddressType(type);
        dest.rebuildTargetAndSourceNodes(type);
        return type;
    }

    private void verifySubject(AMQDestination dest) throws AMQException {
        if (dest.getSubject() == null || dest.getSubject().trim().equals("")) {
            if ("topic".equals(dest.getExchangeClass().toString())) {
                dest.setRoutingKey(new AMQShortString("#"));
                dest.setSubject(dest.getRoutingKey().toString());
            } else {
                dest.setRoutingKey(new AMQShortString(""));
                dest.setSubject("");
            }
        }
    }

    private void createSubscriptionQueue(AMQDestination dest) throws AMQException {
        Node.QueueNode node = (Node.QueueNode)dest.getSourceNode();
        if (dest.getQueueName() == null && dest.getLink() != null && dest.getLink().getName() != null) {
            dest.setQueueName(new AMQShortString(dest.getLink().getName()));
        }
        node.setExclusive(true);
        node.setAutoDelete(!node.isDurable());
        this.send0_10QueueDeclare(dest, null, false, true);
        node.addBinding(new AMQDestination.Binding(dest.getAddressName(), dest.getQueueName(), dest.getSubject(), Collections.<String, Object>emptyMap()));
    }

    public void setLegacyFiledsForQueueType(AMQDestination dest) {
        dest.setQueueName(new AMQShortString(dest.getAddressName()));
        dest.setExchangeName(new AMQShortString(""));
        dest.setExchangeClass(new AMQShortString(""));
        dest.setRoutingKey(dest.getAMQQueueName());
    }

    public void setLegacyFiledsForTopicType(AMQDestination dest) {
        dest.setExchangeName(new AMQShortString(dest.getAddressName()));
        Node.ExchangeNode node = (Node.ExchangeNode)dest.getTargetNode();
        dest.setExchangeClass(node.getExchangeType() == null ? ExchangeDefaults.TOPIC_EXCHANGE_CLASS : new AMQShortString(node.getExchangeType()));
        dest.setRoutingKey(new AMQShortString(dest.getSubject()));
    }

    private String printMap(Map<String, Object> map) {
        StringBuilder sb = new StringBuilder();
        sb.append("<");
        if (map != null) {
            for (String key : map.keySet()) {
                sb.append(key).append(" = ").append(map.get(key)).append(" ");
            }
        }
        sb.append(">");
        return sb.toString();
    }

    private static class Flusher
    extends TimerTask {
        private WeakReference<AMQSession_0_10> session;

        public Flusher(AMQSession_0_10 session) {
            this.session = new WeakReference<AMQSession_0_10>(session);
        }

        public void run() {
            AMQSession_0_10 ssn = (AMQSession_0_10)this.session.get();
            if (ssn == null) {
                this.cancel();
            } else {
                try {
                    ssn.flushAcknowledgments(true);
                }
                catch (Throwable t) {
                    _logger.error("error flushing acks", t);
                }
            }
        }
    }
}

