/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.client.BasicMessageConsumer_0_10;
import org.apache.qpid.client.BasicMessageProducer_0_10;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.ExchangeQueryResult;
import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.QueueQueryResult;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQSession_0_10
extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
implements SessionListener {
    private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
    private final String _name;
    private Session _qpidSession;
    private Object _currentExceptionLock = new Object();
    private QpidException _currentException;
    private Connection _qpidConnection;
    private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000L);
    private ScheduledFuture<?> _flushTaskFuture = null;
    private RangeSet unacked = RangeSetFactory.createRangeSet();
    private int unackedCount = 0;
    private final RangeSet _txRangeSet = RangeSetFactory.createRangeSet();
    private int _txSize = 0;
    private boolean _isHardError = Boolean.getBoolean("qpid.session.legacy_exception_behaviour");

    AMQSession_0_10(Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHighMark, int defaultPrefetchLowMark, String name) {
        super(con, channelId, transacted, acknowledgeMode, defaultPrefetchHighMark, defaultPrefetchLowMark);
        this._qpidConnection = qpidConnection;
        this._name = name;
        this._qpidSession = this.createSession();
        if (this.maxAckDelay > 0L) {
            Flusher flusher = new Flusher(this);
            this._flushTaskFuture = con.scheduleTask(flusher, 0L, this.maxAckDelay, TimeUnit.MILLISECONDS);
            flusher.setFuture(this._flushTaskFuture);
        }
    }

    protected Session createSession() {
        Session qpidSession = this._name == null ? this._qpidConnection.createSession(1L) : this._qpidConnection.createSession(this._name, 1L);
        if (this.isTransacted()) {
            qpidSession.txSelect(new Option[0]);
            qpidSession.setTransacted(true);
        }
        qpidSession.setSessionListener((SessionListener)this);
        return qpidSession;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addUnacked(int id) {
        RangeSet rangeSet = this.unacked;
        synchronized (rangeSet) {
            this.unacked.add(id);
            ++this.unackedCount;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearUnacked() {
        RangeSet rangeSet = this.unacked;
        synchronized (rangeSet) {
            this.unacked.clear();
            this.unackedCount = 0;
        }
    }

    protected Connection getQpidConnection() {
        return this._qpidConnection;
    }

    void failoverPrep() {
        this.syncDispatchQueue(true);
        this.clearUnacked();
    }

    @Override
    public void acknowledgeMessage(long deliveryTag, boolean multiple) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + this.getChannelId());
        }
        if (multiple) {
            for (Long messageTag : this.getUnacknowledgedMessageTags()) {
                if (messageTag > deliveryTag) continue;
                this.addUnacked(messageTag.intValue());
                this.getUnacknowledgedMessageTags().remove(messageTag);
            }
        } else {
            this.addUnacked((int)deliveryTag);
            this.getUnacknowledgedMessageTags().remove(deliveryTag);
        }
        long prefetch = this.getAMQConnection().getMaxPrefetch();
        if ((long)this.unackedCount >= prefetch / 2L || this.maxAckDelay <= 0L || this.getAcknowledgeMode() == 1) {
            this.flushAcknowledgments();
        }
    }

    @Override
    protected void flushAcknowledgments() {
        this.flushAcknowledgments(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void flushAcknowledgments(boolean setSyncBit) {
        RangeSet rangeSet = this.unacked;
        synchronized (rangeSet) {
            if (this.unackedCount > 0) {
                this.messageAcknowledge(this.unacked, this.getAcknowledgeMode() != 257, setSyncBit);
                this.clearUnacked();
            }
        }
    }

    void messageAcknowledge(RangeSet ranges, boolean accept) {
        this.messageAcknowledge(ranges, accept, false);
    }

    void messageAcknowledge(RangeSet ranges, boolean accept, boolean setSyncBit) {
        Session ssn = this.getQpidSession();
        this.flushProcessed(ranges, accept);
        if (accept) {
            ssn.messageAccept(ranges, new Option[]{Option.UNRELIABLE, setSyncBit ? Option.SYNC : Option.NONE});
        }
    }

    void flushProcessed(RangeSet ranges, boolean batch) {
        Session ssn = this.getQpidSession();
        for (Range range : ranges) {
            ssn.processed(range);
        }
        ssn.flushProcessed(new Option[]{batch ? Option.BATCH : Option.NONE});
    }

    @Override
    public void sendQueueBind(String queueName, String routingKey, Map<String, Object> arguments, String exchangeName, AMQDestination destination, boolean nowait) throws QpidException {
        if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL) {
            if (destination != null) {
                for (String rk : destination.getBindingKeys()) {
                    this.doSendQueueBind(queueName, exchangeName, arguments, rk);
                }
                if (!Arrays.asList(destination.getBindingKeys()).contains(routingKey)) {
                    this.doSendQueueBind(queueName, exchangeName, arguments, routingKey);
                }
            } else {
                this.doSendQueueBind(queueName, exchangeName, arguments, routingKey);
            }
        } else {
            ArrayList<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>();
            bindings.addAll(destination.getNode().getBindings());
            String defaultExchange = destination.getAddressType() == 2 ? destination.getAddressName() : "amq.topic";
            for (AMQDestination.Binding binding : bindings) {
                if (binding.getQueue() == null && queueName == null) continue;
                String queue = binding.getQueue() == null ? queueName : binding.getQueue();
                String exchange = binding.getExchange() == null ? defaultExchange : binding.getExchange();
                _logger.debug("Binding queue : " + queue + " exchange: " + exchange + " using binding key " + binding.getBindingKey() + " with args " + Strings.printMap(binding.getArgs()));
                this.doBind(destination, binding, queue, exchange);
            }
        }
        if (!nowait) {
            this.sync();
        }
    }

    private void doSendQueueBind(String queueName, String exchangeName, Map args, String rk) {
        _logger.debug("Binding queue : " + queueName + " exchange: " + exchangeName + " using binding key " + rk);
        this.getQpidSession().exchangeBind(queueName, exchangeName, rk, args, new Option[0]);
    }

    @Override
    public void sendClose(long timeout) throws QpidException, FailoverException {
        this.cancelTimerTask();
        this.flushAcknowledgments();
        try {
            this.getQpidSession().sync();
            this.getQpidSession().close();
        }
        catch (SessionException se) {
            this.setCurrentException(se);
        }
        QpidException amqe = this.getCurrentException();
        if (amqe != null) {
            throw amqe;
        }
    }

    @Override
    public void sendCreateQueue(String name, boolean autoDelete, boolean durable, boolean exclusive, Map<String, Object> arguments) throws QpidException, FailoverException {
        this.getQpidSession().queueDeclare(name, null, arguments, new Option[]{durable ? Option.DURABLE : Option.NONE, autoDelete ? Option.AUTO_DELETE : Option.NONE, exclusive ? Option.EXCLUSIVE : Option.NONE});
        this.sync();
    }

    @Override
    public void sendRecover() throws QpidException, FailoverException {
        RangeSet all = RangeSetFactory.createRangeSet();
        RangeSet delivered = this.gatherRangeSet(this.getUnacknowledgedMessageTags());
        RangeSet prefetched = this.gatherRangeSet(this.getPrefetchedMessageTags());
        for (Range range : delivered) {
            all.add(range);
        }
        for (Range range : prefetched) {
            all.add(range);
        }
        this.flushProcessed(all, false);
        this.getQpidSession().messageRelease(delivered, new Option[]{Option.SET_REDELIVERED});
        this.getQpidSession().messageRelease(prefetched, new Option[0]);
        this.sync();
    }

    private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags) {
        Long tag;
        RangeSet ranges = RangeSetFactory.createRangeSet();
        while ((tag = messageTags.poll()) != null) {
            ranges.add(tag.intValue());
        }
        return ranges;
    }

    @Override
    public void releaseForRollback() {
        if (this._txSize > 0) {
            this.flushProcessed(this._txRangeSet, false);
            this.getQpidSession().messageRelease(this._txRangeSet, new Option[]{Option.SET_REDELIVERED});
            this._txRangeSet.clear();
            this._txSize = 0;
        }
    }

    @Override
    public void rejectMessage(long deliveryTag, boolean requeue) {
        RangeSet ranges = RangeSetFactory.createRangeSet();
        ranges.add((int)deliveryTag);
        this.flushProcessed(ranges, false);
        if (requeue) {
            this.getQpidSession().messageRelease(ranges, new Option[0]);
        } else {
            this.getQpidSession().messageRelease(ranges, new Option[]{Option.SET_REDELIVERED});
        }
    }

    @Override
    public BasicMessageConsumer_0_10 createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String messageSelector, Map<String, Object> rawSelector, boolean noConsume, boolean autoClose) throws JMSException {
        return new BasicMessageConsumer_0_10(this.getChannelId(), this.getAMQConnection(), destination, messageSelector, noLocal, this.getMessageFactoryRegistry(), this, rawSelector, prefetchHigh, prefetchLow, exclusive, this.getAcknowledgeMode(), noConsume, autoClose);
    }

    @Override
    public boolean isQueueBound(String exchangeName, String queueName, String routingKey) {
        return this.isQueueBound(exchangeName, queueName, routingKey, (Map<String, Object>)null);
    }

    @Override
    public boolean isQueueBound(AMQDestination destination) {
        return this.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(), destination.getRoutingKey(), destination.getBindingKeys());
    }

    public boolean isQueueBound(String exchangeName, String queueName, String routingKey, String[] bindingKeys) {
        String rk = null;
        if (bindingKeys != null && bindingKeys.length > 0) {
            rk = bindingKeys[0];
        } else if (routingKey != null) {
            rk = routingKey;
        }
        return this.isQueueBound(exchangeName, queueName, rk, (Map<String, Object>)null);
    }

    @Override
    public boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String, Object> args) {
        ExchangeBoundResult bindingQueryResult = (ExchangeBoundResult)this.getQpidSession().exchangeBound(exchangeName, queueName, bindingKey, args, new Option[0]).get();
        boolean res = bindingKey == null ? !bindingQueryResult.getExchangeNotFound() && !bindingQueryResult.getQueueNotFound() : (args == null ? !bindingQueryResult.getExchangeNotFound() && !bindingQueryResult.getKeyNotMatched() && !bindingQueryResult.getQueueNotFound() && !bindingQueryResult.getQueueNotMatched() : !bindingQueryResult.getExchangeNotFound() && !bindingQueryResult.getKeyNotMatched() && !bindingQueryResult.getQueueNotFound() && !bindingQueryResult.getQueueNotMatched() && !bindingQueryResult.getArgsNotMatched());
        return res;
    }

    @Override
    protected boolean isBound(String exchangeName, String amqQueueName, String routingKey) {
        return this.isQueueBound(exchangeName, amqQueueName, routingKey);
    }

    @Override
    public void sendConsume(BasicMessageConsumer_0_10 consumer, String queueName, boolean nowait, int tag) throws QpidException, FailoverException {
        queueName = this.preprocessAddressTopic(consumer, queueName);
        boolean preAcquire = consumer.isPreAcquire();
        AMQDestination destination = consumer.getDestination();
        long capacity = consumer.getCapacity();
        Map<String, Object> arguments = consumer.getArguments();
        Link link = destination.getLink();
        if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) {
            arguments.putAll(link.getSubscription().getArgs());
        }
        boolean acceptModeNone = this.getAcknowledgeMode() == 257;
        String queue = queueName == null ? destination.getAddressName() : queueName;
        this.getQpidSession().messageSubscribe(queue, String.valueOf(tag), acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0L, arguments, new Option[]{consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE});
        String consumerTag = consumer.getConsumerTagString();
        if (capacity == 0L) {
            this.getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT, new Option[0]);
        } else {
            this.getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW, new Option[0]);
        }
        this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, -1L, new Option[]{Option.UNRELIABLE});
        if (capacity > 0L && this.getDispatcher() != null && (this.isStarted() || this.isImmediatePrefetch())) {
            this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, capacity, new Option[]{Option.UNRELIABLE});
        }
        this.sync();
    }

    @Override
    public BasicMessageProducer_0_10 createMessageProducer(Destination destination, Boolean mandatory, Boolean immediate, long producerId) throws JMSException {
        try {
            return new BasicMessageProducer_0_10(this.getAMQConnection(), (AMQDestination)destination, this.isTransacted(), this.getChannelId(), this, producerId, immediate, mandatory);
        }
        catch (QpidException e) {
            throw this.toJMSException("Error creating producer", e);
        }
        catch (TransportException e) {
            throw this.toJMSException("Exception while creating message producer:" + e.getMessage(), e);
        }
    }

    @Override
    public void sendExchangeDeclare(String name, String type, boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws QpidException, FailoverException {
        this.sendExchangeDeclare(name, type, null, null, nowait, durable, autoDelete);
    }

    @Override
    public void sendExchangeDeclare(String name, String type, boolean nowait, boolean durable, boolean autoDelete, Map<String, Object> arguments, boolean passive) throws QpidException, FailoverException {
        this.sendExchangeDeclare(name, type, null, arguments, nowait, durable, autoDelete);
    }

    public void sendExchangeDeclare(String name, String type, String alternateExchange, Map<String, Object> args, boolean nowait, boolean durable, boolean autoDelete) throws QpidException {
        this.getQpidSession().exchangeDeclare(name, type, alternateExchange, args, new Option[]{name.startsWith("amq.") ? Option.PASSIVE : Option.NONE, durable ? Option.DURABLE : Option.NONE, autoDelete ? Option.AUTO_DELETE : Option.NONE});
        if (!nowait) {
            this.sync();
        }
    }

    @Override
    public void sendExchangeDelete(String name, boolean nowait) throws QpidException, FailoverException {
        this.getQpidSession().exchangeDelete(name, new Option[0]);
        if (!nowait) {
            this.sync();
        }
    }

    public String send0_10QueueDeclare(AMQDestination amqd, boolean noLocal, boolean nowait, boolean passive) throws QpidException {
        String queueName;
        if (amqd.getAMQQueueName() == null) {
            queueName = this.createTemporaryQueueName();
            amqd.setQueueName(queueName);
        } else {
            queueName = amqd.getAMQQueueName();
        }
        if (amqd.getDestSyntax() == AMQDestination.DestSyntax.BURL) {
            HashMap<String, Boolean> arguments = new HashMap<String, Boolean>();
            if (noLocal) {
                arguments.put("no-local", true);
            }
            this.getQpidSession().queueDeclare(queueName, "", arguments, new Option[]{amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE, passive ? Option.PASSIVE : Option.NONE});
        } else {
            Node node = amqd.getNode();
            HashMap<String, Object> arguments = new HashMap<String, Object>();
            arguments.putAll(node.getDeclareArgs());
            if (arguments.get("no-local") == null) {
                arguments.put("no-local", noLocal);
            }
            this.getQpidSession().queueDeclare(queueName, node.getAlternateExchange(), arguments, new Option[]{node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE});
        }
        if (!nowait) {
            this.sync();
        }
        return queueName;
    }

    @Override
    public void sendQueueDelete(String queueName) throws QpidException, FailoverException {
        this.getQpidSession().queueDelete(queueName, new Option[0]);
        this.sync();
    }

    @Override
    public void sendSuspendChannel(boolean suspend) throws QpidException, FailoverException {
        if (suspend) {
            for (BasicMessageConsumer consumer : this.getConsumers()) {
                this.getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), new Option[]{Option.UNRELIABLE});
            }
            this.sync();
        } else {
            for (BasicMessageConsumer_0_10 consumer : this.getConsumers()) {
                String consumerTag = String.valueOf(consumer.getConsumerTag());
                try {
                    long capacity = consumer.getCapacity();
                    if (capacity == 0L) {
                        if (consumer.getMessageListener() != null) {
                            this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, 1L, new Option[]{Option.UNRELIABLE});
                        }
                    } else {
                        this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, capacity, new Option[]{Option.UNRELIABLE});
                    }
                    this.getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, -1L, new Option[]{Option.UNRELIABLE});
                }
                catch (Exception e) {
                    throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", (Throwable)e);
                }
            }
        }
        this.sync();
    }

    @Override
    public void sendRollback() throws QpidException, FailoverException {
        this.getQpidSession().txRollback(new Option[0]);
        this.sync();
    }

    protected Session getQpidSession() {
        return this._qpidSession;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public QpidException getCurrentException() {
        QpidException amqe = null;
        Object object = this._currentExceptionLock;
        synchronized (object) {
            if (this._currentException != null) {
                amqe = this._currentException;
                this._currentException = null;
            }
        }
        return amqe;
    }

    public void opened(Session ssn) {
    }

    public void resumed(Session ssn) {
        this._qpidConnection = ssn.getConnection();
    }

    public void message(Session ssn, MessageTransfer xfr) {
        this.messageReceived(new UnprocessedMessage_0_10(xfr));
    }

    public void exception(Session ssn, SessionException exc) {
        this.setCurrentException(exc);
    }

    public void closed(Session ssn) {
        try {
            super.closed(null);
            if (this._flushTaskFuture != null) {
                this._flushTaskFuture.cancel(false);
                this._flushTaskFuture = null;
            }
        }
        catch (Exception e) {
            _logger.error("Error closing JMS session", (Throwable)e);
        }
    }

    @Override
    public QpidException getLastException() {
        return this.getCurrentException();
    }

    @Override
    protected String declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait, final boolean passive) throws QpidException {
        return new FailoverNoopSupport<String, QpidException>(new FailoverProtectedOperation<String, QpidException>(){

            @Override
            public String execute() throws QpidException, FailoverException {
                if (amqd.isNameRequired()) {
                    String binddingKey = "";
                    for (String key : amqd.getBindingKeys()) {
                        binddingKey = binddingKey + "_" + key;
                    }
                    amqd.setQueueName(binddingKey + "@" + amqd.getExchangeName() + "_" + UUID.randomUUID());
                }
                return AMQSession_0_10.this.send0_10QueueDeclare(amqd, noLocal, nowait, passive);
            }
        }, this.getAMQConnection()).execute();
    }

    @Override
    protected Long requestQueueDepth(AMQDestination amqd, boolean sync) {
        this.flushAcknowledgments();
        if (sync) {
            this.getQpidSession().sync();
        }
        return ((QueueQueryResult)this.getQpidSession().queueQuery(amqd.getQueueName(), new Option[0]).get()).getMessageCount();
    }

    @Override
    protected void addDeliveredMessage(long id) {
        this._txRangeSet.add((int)id);
        ++this._txSize;
    }

    protected void sendTxCompletionsIfNecessary() {
        if (this._txSize > 0 && (this.getAMQConnection().getMaxPrefetch() == 1L || this.getAMQConnection().getMaxPrefetch() != 0L && (long)this._txSize % (this.getAMQConnection().getMaxPrefetch() / 2L) == 0L)) {
            this.messageAcknowledge(this._txRangeSet, false);
        }
    }

    @Override
    public void commitImpl() throws QpidException, FailoverException, TransportException {
        if (this._txSize > 0) {
            this.messageAcknowledge(this._txRangeSet, true);
            this._txRangeSet.clear();
            this._txSize = 0;
        }
        this.getQpidSession().setAutoSync(true);
        try {
            this.getQpidSession().txCommit(new Option[0]);
        }
        finally {
            this.getQpidSession().setAutoSync(false);
        }
        this.sync();
    }

    @Override
    protected final boolean tagLE(long tag1, long tag2) {
        return Serial.le((int)((int)tag1), (int)((int)tag2));
    }

    @Override
    protected final boolean updateRollbackMark(long currentMark, long deliveryTag) {
        return Serial.lt((int)((int)currentMark), (int)((int)deliveryTag));
    }

    @Override
    public void sync() throws QpidException {
        try {
            this.getQpidSession().sync();
        }
        catch (SessionException se) {
            this.setCurrentException(se);
        }
        QpidException amqe = this.getCurrentException();
        if (amqe != null) {
            throw amqe;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setCurrentException(SessionException se) {
        Object object = this._currentExceptionLock;
        synchronized (object) {
            ExecutionException ee = se.getException();
            int code = AMQConstant.INTERNAL_ERROR.getCode();
            if (ee != null) {
                code = ee.getErrorCode().getValue();
            }
            AMQException amqe = new AMQException(AMQConstant.getConstant((int)code), this._isHardError, se.getMessage(), se.getCause());
            this._currentException = amqe;
        }
        if (!this._isHardError) {
            this.cancelTimerTask();
            this.stopDispatcherThread();
            try {
                this.closed(this._currentException);
            }
            catch (Exception e) {
                _logger.warn("Error closing session", (Throwable)e);
            }
            this.getAMQConnection().exceptionReceived(this._currentException);
        } else {
            this.getAMQConnection().closed(this._currentException);
        }
    }

    @Override
    public AMQMessageDelegateFactory getMessageDelegateFactory() {
        return AMQMessageDelegateFactory.FACTORY_0_10;
    }

    @Override
    public boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws QpidException {
        boolean match = true;
        ExchangeQueryResult result = (ExchangeQueryResult)this.getQpidSession().exchangeQuery(dest.getAddressName(), new Option[]{Option.NONE}).get();
        match = !result.getNotFound();
        Node node = dest.getNode();
        if (match) {
            if (assertNode) {
                match = result.getDurable() == node.isDurable() && node.getExchangeType() != null && node.getExchangeType().equals(result.getType()) && this.matchProps(result.getArguments(), node.getDeclareArgs());
            } else {
                _logger.debug("Setting Exchange type " + result.getType());
                node.setExchangeType(result.getType());
                dest.setExchangeClass(result.getType());
            }
        }
        if (assertNode && !match) {
            throw new QpidException("Assert failed for address : " + dest + ", Result was : " + result);
        }
        return match;
    }

    @Override
    public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws QpidException {
        Node node = dest.getNode();
        return this.isQueueExist(dest.getAddressName(), assertNode, node.isDurable(), node.isAutoDelete(), node.isExclusive(), node.getDeclareArgs());
    }

    public boolean isQueueExist(String queueName, boolean assertNode, boolean durable, boolean autoDelete, boolean exclusive, Map<String, Object> args) throws QpidException {
        boolean match = true;
        try {
            QueueQueryResult result = (QueueQueryResult)this.getQpidSession().queueQuery(queueName, new Option[]{Option.NONE}).get();
            match = queueName.equals(result.getQueue());
            if (match && assertNode) {
                boolean bl = match = result.getDurable() == durable && result.getAutoDelete() == autoDelete && result.getExclusive() == exclusive && this.matchProps(result.getArguments(), args);
            }
            if (assertNode && !match) {
                throw new QpidException("Assert failed for queue : " + queueName + ", Result was : " + result);
            }
        }
        catch (SessionException e) {
            if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED || e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND) {
                match = false;
            }
            throw new AMQException(AMQConstant.getConstant((int)e.getException().getErrorCode().getValue()), "Error querying queue", (Throwable)e);
        }
        return match;
    }

    private boolean matchProps(Map<String, Object> target, Map<String, Object> source) {
        boolean match = true;
        for (String key : source.keySet()) {
            match = target.containsKey(key) && (target.get(key).equals(source.get(key)) || target.get(key) instanceof Number && source.get(key) instanceof Number && ((Number)target.get(key)).longValue() == ((Number)source.get(key)).longValue());
            if (match) continue;
            StringBuffer buf = new StringBuffer();
            buf.append("Property given in address did not match with the args sent by the broker.");
            buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
            buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }");
            _logger.debug(buf.toString());
            return match;
        }
        return match;
    }

    @Override
    public int resolveAddressType(AMQDestination dest) throws QpidException {
        int type = dest.getAddressType();
        String name = dest.getAddressName();
        if (type != 3) {
            return type;
        }
        ExchangeBoundResult result = (ExchangeBoundResult)this.getQpidSession().exchangeBound(name, name, null, null, new Option[0]).get();
        if (result.getQueueNotFound() && result.getExchangeNotFound()) {
            type = 1;
        } else if (result.getExchangeNotFound()) {
            type = 1;
        } else if (result.getQueueNotFound()) {
            type = 2;
        } else {
            throw new QpidException("Ambiguous address, please specify queue or topic as node type");
        }
        dest.setAddressType(type);
        return type;
    }

    @Override
    void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws QpidException {
        Link.SubscriptionQueue queueProps;
        Map<String, Object> arguments;
        Link link = dest.getLink();
        String queueName = dest.getQueueName();
        if (queueName == null) {
            queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName();
            dest.setQueueName(queueName);
        }
        if (!(arguments = (queueProps = link.getSubscriptionQueue()).getDeclareArgs()).containsKey("no-local")) {
            arguments.put("no-local", noLocal);
        }
        if (link.isDurable() && queueName.startsWith("TempQueue")) {
            throw new QpidException("You cannot mark a subscription queue as durable without providing a name for the link.");
        }
        this.getQpidSession().queueDeclare(queueName, queueProps.getAlternateExchange(), arguments, new Option[]{queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, link.isDurable() ? Option.DURABLE : Option.NONE, queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE});
        HashMap<String, String> bindingArguments = new HashMap<String, String>();
        bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
        this.getQpidSession().exchangeBind(queueName, dest.getAddressName(), dest.getSubject(), bindingArguments, new Option[0]);
    }

    @Override
    protected void acknowledgeImpl() {
        RangeSet ranges = this.gatherRangeSet(this.getUnacknowledgedMessageTags());
        if (ranges.size() > 0) {
            this.messageAcknowledge(ranges, true);
            this.getQpidSession().sync();
        }
    }

    @Override
    void resubscribe() throws QpidException {
        this._txRangeSet.clear();
        this._txSize = 0;
        super.resubscribe();
        this.getQpidSession().sync();
    }

    @Override
    void stop() throws QpidException {
        this.suspendChannelIfNotClosing();
        this.drainDispatchQueueWithDispatcher();
        this.stopExistingDispatcher();
        for (BasicMessageConsumer consumer : this.getConsumers()) {
            List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
            this.getPrefetchedMessageTags().addAll(tags);
        }
        RangeSet delivered = this.gatherRangeSet(this.getUnacknowledgedMessageTags());
        RangeSet prefetched = this.gatherRangeSet(this.getPrefetchedMessageTags());
        RangeSet all = RangeSetFactory.createRangeSet((int)(delivered.size() + prefetched.size()));
        for (Range range : delivered) {
            all.add(range);
        }
        for (Range range : prefetched) {
            all.add(range);
        }
        this.flushProcessed(all, false);
        this.getQpidSession().messageRelease(delivered, new Option[]{Option.SET_REDELIVERED});
        this.getQpidSession().messageRelease(prefetched, new Option[0]);
        this.sync();
    }

    @Override
    public boolean isFlowBlocked() {
        return this._qpidSession.isFlowBlocked();
    }

    @Override
    public void setFlowControl(boolean active) {
        throw new UnsupportedOperationException("Operation not supported by this protocol");
    }

    private void cancelTimerTask() {
        if (this._flushTaskFuture != null) {
            this._flushTaskFuture.cancel(false);
            this._flushTaskFuture = null;
        }
    }

    @Override
    protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws QpidException {
        Node node = dest.getNode();
        Map<String, Object> arguments = node.getDeclareArgs();
        if (!arguments.containsKey("no-local")) {
            arguments.put("no-local", noLocal);
        }
        this.getQpidSession().queueDeclare(dest.getAddressName(), node.getAlternateExchange(), arguments, new Option[]{node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE});
        this.createBindings(dest, dest.getNode().getBindings());
        this.sync();
    }

    @Override
    void handleExchangeNodeCreation(AMQDestination dest) throws QpidException {
        Node node = dest.getNode();
        this.sendExchangeDeclare(dest.getAddressName(), node.getExchangeType(), node.getAlternateExchange(), node.getDeclareArgs(), false, node.isDurable(), node.isAutoDelete());
        this.createBindings(dest, dest.getNode().getBindings());
        this.sync();
    }

    @Override
    protected void doBind(AMQDestination dest, AMQDestination.Binding binding, String queue, String exchange) {
        this.getQpidSession().exchangeBind(queue, exchange, binding.getBindingKey(), binding.getArgs(), new Option[0]);
    }

    @Override
    void handleLinkDelete(AMQDestination dest) throws QpidException {
        String defaultExchangeForBinding = dest.getAddressType() == 2 ? dest.getAddressName() : "amq.topic";
        String defaultQueueName = null;
        defaultQueueName = 1 == dest.getAddressType() ? dest.getQueueName() : (dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName());
        for (AMQDestination.Binding binding : dest.getLink().getBindings()) {
            String exchange;
            String queue = binding.getQueue() == null ? defaultQueueName : binding.getQueue();
            String string = exchange = binding.getExchange() == null ? defaultExchangeForBinding : binding.getExchange();
            if (_logger.isDebugEnabled()) {
                _logger.debug("Unbinding queue : " + queue + " exchange: " + exchange + " using binding key " + binding.getBindingKey() + " with args " + Strings.printMap(binding.getArgs()));
            }
            this.getQpidSession().exchangeUnbind(queue, exchange, binding.getBindingKey(), new Option[0]);
        }
    }

    void deleteSubscriptionQueue(AMQDestination dest) throws QpidException {
        if (dest.getAddressType() == 2 && dest.getLink().getSubscriptionQueue().isExclusive() && this.isQueueExist(dest.getQueueName(), false, false, false, false, null)) {
            this.getQpidSession().queueDelete(dest.getQueueName(), new Option[0]);
        }
    }

    @Override
    void handleNodeDelete(AMQDestination dest) throws QpidException {
        if (2 == dest.getAddressType()) {
            if (this.isExchangeExist(dest, false)) {
                this.getQpidSession().exchangeDelete(dest.getAddressName(), new Option[0]);
                this.setUnresolved(dest);
            }
        } else if (this.isQueueExist(dest, false)) {
            this.getQpidSession().queueDelete(dest.getAddressName(), new Option[0]);
            this.setUnresolved(dest);
        }
    }

    private static class Flusher
    implements Runnable {
        private WeakReference<AMQSession_0_10> session;
        private ScheduledFuture<?> _future;

        public Flusher(AMQSession_0_10 session) {
            this.session = new WeakReference<AMQSession_0_10>(session);
        }

        public void setFuture(ScheduledFuture<?> future) {
            this._future = future;
        }

        @Override
        public void run() {
            AMQSession_0_10 ssn = (AMQSession_0_10)this.session.get();
            if (ssn == null) {
                if (this._future != null) {
                    this._future.cancel(false);
                }
            } else {
                try {
                    ssn.flushAcknowledgments(true);
                }
                catch (Exception t) {
                    _logger.error("error flushing acks", (Throwable)t);
                }
            }
        }
    }
}

