/*
 * Decompiled with CFR 0.152.
 */
package com.swiftmq.jms.v500;

import com.swiftmq.client.thread.PoolManager;
import com.swiftmq.jms.BytesMessageImpl;
import com.swiftmq.jms.DestinationImpl;
import com.swiftmq.jms.ExceptionConverter;
import com.swiftmq.jms.MapMessageImpl;
import com.swiftmq.jms.MessageImpl;
import com.swiftmq.jms.ObjectMessageImpl;
import com.swiftmq.jms.QueueImpl;
import com.swiftmq.jms.SessionExtended;
import com.swiftmq.jms.StreamMessageImpl;
import com.swiftmq.jms.SwiftMQSession;
import com.swiftmq.jms.TemporaryQueueImpl;
import com.swiftmq.jms.TemporaryTopicImpl;
import com.swiftmq.jms.TextMessageImpl;
import com.swiftmq.jms.TopicImpl;
import com.swiftmq.jms.smqp.v500.AcknowledgeMessageRequest;
import com.swiftmq.jms.smqp.v500.AssociateMessageRequest;
import com.swiftmq.jms.smqp.v500.AsyncMessageDeliveryRequest;
import com.swiftmq.jms.smqp.v500.CloseSessionRequest;
import com.swiftmq.jms.smqp.v500.CommitReply;
import com.swiftmq.jms.smqp.v500.CommitRequest;
import com.swiftmq.jms.smqp.v500.CreateBrowserReply;
import com.swiftmq.jms.smqp.v500.CreateBrowserRequest;
import com.swiftmq.jms.smqp.v500.CreateConsumerReply;
import com.swiftmq.jms.smqp.v500.CreateConsumerRequest;
import com.swiftmq.jms.smqp.v500.CreateDurableReply;
import com.swiftmq.jms.smqp.v500.CreateDurableRequest;
import com.swiftmq.jms.smqp.v500.CreateProducerReply;
import com.swiftmq.jms.smqp.v500.CreateProducerRequest;
import com.swiftmq.jms.smqp.v500.CreatePublisherReply;
import com.swiftmq.jms.smqp.v500.CreatePublisherRequest;
import com.swiftmq.jms.smqp.v500.CreateShadowConsumerRequest;
import com.swiftmq.jms.smqp.v500.CreateSubscriberReply;
import com.swiftmq.jms.smqp.v500.CreateSubscriberRequest;
import com.swiftmq.jms.smqp.v500.CreateTmpQueueReply;
import com.swiftmq.jms.smqp.v500.CreateTmpQueueRequest;
import com.swiftmq.jms.smqp.v500.DeleteDurableReply;
import com.swiftmq.jms.smqp.v500.DeleteDurableRequest;
import com.swiftmq.jms.smqp.v500.RecoverSessionRequest;
import com.swiftmq.jms.smqp.v500.RollbackRequest;
import com.swiftmq.jms.v500.ConnectionImpl;
import com.swiftmq.jms.v500.MessageConsumerImpl;
import com.swiftmq.jms.v500.QueueBrowserImpl;
import com.swiftmq.jms.v500.QueueReceiverImpl;
import com.swiftmq.jms.v500.QueueSenderImpl;
import com.swiftmq.jms.v500.TopicPublisherImpl;
import com.swiftmq.jms.v500.TopicSubscriberImpl;
import com.swiftmq.swiftlet.queue.MessageEntry;
import com.swiftmq.swiftlet.queue.MessageIndex;
import com.swiftmq.swiftlet.threadpool.AsyncTask;
import com.swiftmq.swiftlet.threadpool.ThreadPool;
import com.swiftmq.tools.collection.RingBuffer;
import com.swiftmq.tools.queue.SingleProcessorQueue;
import com.swiftmq.tools.requestreply.Reply;
import com.swiftmq.tools.requestreply.Request;
import com.swiftmq.tools.requestreply.RequestRegistry;
import com.swiftmq.tools.requestreply.RequestService;
import com.swiftmq.util.SwiftUtilities;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

public class SessionImpl
implements Session,
RequestService,
QueueSession,
TopicSession,
SwiftMQSession,
SessionExtended {
    public static final String DISPATCH_TOKEN = "sys$jms.client.session.sessiontask";
    static final int TYPE_SESSION = 0;
    static final int TYPE_QUEUE_SESSION = 1;
    static final int TYPE_TOPIC_SESSION = 2;
    boolean ignoreClose = false;
    boolean closed = false;
    boolean transacted = false;
    int acknowledgeMode = 0;
    int dispatchId = 0;
    int myDispatchId = 0;
    String clientId = null;
    RequestRegistry requestRegistry = null;
    ConnectionImpl myConnection = null;
    String myHostname = null;
    String userName = null;
    ExceptionListener exceptionListener = null;
    Map consumerMap = new HashMap();
    int lastConsumerId = -1;
    ArrayList transactedRequestList = new ArrayList();
    MessageListener messageListener = null;
    RingBuffer messageChunk = null;
    boolean shadowConsumerCreated = false;
    MessageEntry lastMessage = null;
    boolean autoAssign = true;
    ThreadPool sessionPool = null;
    SessionDeliveryQueue sessionQueue = null;
    SessionTask sessionTask = null;
    int recoveryEpoche = 0;
    boolean recoveryInProgress = false;
    int type = 0;
    boolean useThreadContextCL = false;

    protected SessionImpl(int type, ConnectionImpl myConnection, boolean transacted, int acknowledgeMode, int dispatchId, RequestRegistry requestRegistry, String myHostname, String clientId) {
        this.type = type;
        this.myConnection = myConnection;
        this.transacted = transacted;
        this.acknowledgeMode = acknowledgeMode;
        this.dispatchId = dispatchId;
        this.requestRegistry = requestRegistry;
        this.myHostname = myHostname;
        this.clientId = clientId;
        this.sessionPool = PoolManager.getInstance().getSessionPool();
        this.useThreadContextCL = myConnection.isUseThreadContextCL();
        this.sessionTask = new SessionTask();
        this.sessionQueue = new SessionDeliveryQueue();
    }

    void startSession() {
        this.sessionQueue.startQueue();
    }

    void stopSession() {
        this.sessionQueue.stopQueue();
    }

    String getUserName() {
        return this.userName;
    }

    void setUserName(String userName) {
        this.userName = userName;
    }

    public ConnectionImpl getMyConnection() {
        return this.myConnection;
    }

    void verifyState() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Session is closed");
        }
    }

    public boolean isIgnoreClose() {
        return this.ignoreClose;
    }

    @Override
    public void setIgnoreClose(boolean ignoreClose) {
        this.ignoreClose = ignoreClose;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void storeTransactedRequest(Request req) {
        ArrayList arrayList = this.transactedRequestList;
        synchronized (arrayList) {
            this.transactedRequestList.add(req);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void storeTransactedMessage(int producerId, MessageImpl msg) {
        ArrayList arrayList = this.transactedRequestList;
        synchronized (arrayList) {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(bos);
            try {
                dos.writeInt(producerId);
                msg.writeContent(dos);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            this.transactedRequestList.add(bos.toByteArray());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Reply requestTransaction(CommitRequest req) {
        ArrayList arrayList = this.transactedRequestList;
        synchronized (arrayList) {
            req.setMessages(this.transactedRequestList.toArray(new Object[this.transactedRequestList.size()]));
            this.transactedRequestList.clear();
        }
        return this.requestRegistry.request(req);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object[] getAndClearCurrentTransaction() {
        Object[] wrapper = null;
        ArrayList arrayList = this.transactedRequestList;
        synchronized (arrayList) {
            wrapper = this.transactedRequestList.toArray(new Object[this.transactedRequestList.size()]);
            this.transactedRequestList.clear();
        }
        return wrapper;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setCurrentTransaction(Object[] current) {
        ArrayList arrayList = this.transactedRequestList;
        synchronized (arrayList) {
            this.transactedRequestList.clear();
            if (current != null) {
                for (int i = 0; i < current.length; ++i) {
                    this.transactedRequestList.add(current[i]);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dropTransaction() {
        ArrayList arrayList = this.transactedRequestList;
        synchronized (arrayList) {
            this.transactedRequestList.clear();
        }
    }

    void setExceptionListener(ExceptionListener exceptionListener) {
        this.exceptionListener = exceptionListener;
    }

    int getMyDispatchId() {
        return this.myDispatchId;
    }

    void setMyDispatchId(int myDispatchId) {
        this.myDispatchId = myDispatchId;
    }

    synchronized void addMessageConsumerImpl(MessageConsumerImpl consumer) {
        if (this.lastConsumerId == Integer.MAX_VALUE) {
            this.lastConsumerId = -1;
        }
        ++this.lastConsumerId;
        this.consumerMap.put(new Integer(this.lastConsumerId), consumer);
        consumer.setConsumerId(this.lastConsumerId);
    }

    synchronized void removeMessageConsumerImpl(MessageConsumerImpl consumer) {
        this.consumerMap.remove(new Integer(consumer.getConsumerId()));
    }

    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        this.verifyState();
        return this.createReceiver(queue, null);
    }

    public synchronized QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
        this.verifyState();
        if (queue == null) {
            throw new InvalidDestinationException("createReceiver, queue is null!");
        }
        QueueReceiverImpl qr = null;
        CreateConsumerReply reply = null;
        try {
            String ms = messageSelector;
            if (messageSelector != null && messageSelector.trim().length() == 0) {
                ms = null;
            }
            reply = (CreateConsumerReply)this.requestRegistry.request(new CreateConsumerRequest(this.dispatchId, (QueueImpl)queue, ms));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        int qcId = reply.getQueueConsumerId();
        qr = new QueueReceiverImpl(this.transacted, this.acknowledgeMode, this.dispatchId, this.requestRegistry, queue, messageSelector, this);
        qr.setServerQueueConsumerId(qcId);
        qr.setDoAck(!this.transacted && this.acknowledgeMode != 2);
        this.addMessageConsumerImpl(qr);
        return qr;
    }

    public QueueSender createSender(Queue queue) throws JMSException {
        this.verifyState();
        if (queue == null) {
            return new QueueSenderImpl(this, queue, this.dispatchId, -1, this.requestRegistry, this.myHostname);
        }
        QueueSenderImpl queueSender = null;
        CreateProducerReply reply = null;
        try {
            reply = (CreateProducerReply)this.requestRegistry.request(new CreateProducerRequest(this.dispatchId, (QueueImpl)queue));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        queueSender = new QueueSenderImpl(this, queue, this.dispatchId, reply.getQueueProducerId(), this.requestRegistry, this.myHostname);
        queueSender.setDestinationImpl((Destination)queue);
        return queueSender;
    }

    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        return this.createSubscriber(topic, null, false);
    }

    public synchronized TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
        this.verifyState();
        if (topic == null) {
            throw new InvalidDestinationException("createSubscriber, topic is null!");
        }
        TopicSubscriberImpl ts = null;
        CreateSubscriberReply reply = null;
        try {
            String ms = messageSelector;
            if (messageSelector != null && messageSelector.trim().length() == 0) {
                ms = null;
            }
            reply = (CreateSubscriberReply)this.requestRegistry.request(new CreateSubscriberRequest(this.dispatchId, (TopicImpl)topic, ms, noLocal));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        int tsId = reply.getTopicSubscriberId();
        ts = new TopicSubscriberImpl(this.transacted, this.acknowledgeMode, this.dispatchId, this.requestRegistry, topic, messageSelector, this, noLocal);
        ts.setServerQueueConsumerId(tsId);
        ts.setDoAck(false);
        this.addMessageConsumerImpl(ts);
        return ts;
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
        return this.createDurableSubscriber(topic, name, null, false);
    }

    public synchronized TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        this.verifyState();
        if (this.myConnection.getClientID() == null) {
            throw new IllegalStateException("unable to create durable subscriber, no client ID has been set");
        }
        if (topic == null) {
            throw new InvalidDestinationException("createDurableSubscriber, topic is null!");
        }
        if (name == null) {
            throw new NullPointerException("createDurableSubscriber, name is null!");
        }
        try {
            SwiftUtilities.verifyDurableName(name);
        }
        catch (Exception e) {
            throw new JMSException(e.getMessage());
        }
        TopicSubscriberImpl ts = null;
        CreateDurableReply reply = null;
        try {
            String ms = messageSelector;
            if (messageSelector != null && messageSelector.trim().length() == 0) {
                ms = null;
            }
            reply = (CreateDurableReply)this.requestRegistry.request(new CreateDurableRequest(this.dispatchId, (TopicImpl)topic, ms, noLocal, name));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        int tsId = reply.getTopicSubscriberId();
        ts = new TopicSubscriberImpl(this.transacted, this.acknowledgeMode, this.dispatchId, this.requestRegistry, topic, messageSelector, this, noLocal);
        ts.setServerQueueConsumerId(tsId);
        ts.setDoAck(!this.transacted && this.acknowledgeMode != 2);
        this.addMessageConsumerImpl(ts);
        return ts;
    }

    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        this.verifyState();
        if (topic == null) {
            return new TopicPublisherImpl(this, topic, this.dispatchId, -1, this.requestRegistry, this.myHostname, this.clientId);
        }
        TopicPublisherImpl topicPublisher = null;
        CreatePublisherReply reply = null;
        try {
            reply = (CreatePublisherReply)this.requestRegistry.request(new CreatePublisherRequest(this.dispatchId, (TopicImpl)topic));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        topicPublisher = new TopicPublisherImpl(this, topic, this.dispatchId, reply.getTopicPublisherId(), this.requestRegistry, this.myHostname, this.clientId);
        topicPublisher.setDestinationImpl((Destination)topic);
        return topicPublisher;
    }

    public MessageProducer createProducer(Destination destination) throws JMSException {
        if (destination == null) {
            return this.createSender(null);
        }
        DestinationImpl destImpl = (DestinationImpl)destination;
        QueueSender producer = null;
        switch (destImpl.getType()) {
            case 0: 
            case 3: {
                producer = this.createSender((Queue)destination);
                break;
            }
            case 1: 
            case 2: {
                producer = this.createPublisher((Topic)destination);
            }
        }
        return producer;
    }

    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        return this.createConsumer(destination, null, false);
    }

    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
        return this.createConsumer(destination, selector, false);
    }

    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("createConsumer, destination is null!");
        }
        DestinationImpl destImpl = (DestinationImpl)destination;
        QueueReceiver consumer = null;
        switch (destImpl.getType()) {
            case 0: 
            case 3: {
                consumer = this.createReceiver((Queue)destination, selector);
                break;
            }
            case 1: 
            case 2: {
                consumer = this.createSubscriber((Topic)destination, selector, noLocal);
            }
        }
        return consumer;
    }

    public Queue createQueue(String queueName) throws JMSException {
        this.verifyState();
        if (this.type == 2) {
            throw new IllegalStateException("Operation not allowed on this session type");
        }
        if (queueName == null) {
            throw new InvalidDestinationException("createQueue, queueName is null!");
        }
        try {
            SwiftUtilities.verifyQueueName(queueName);
        }
        catch (Exception e) {
            throw new JMSException(e.getMessage());
        }
        return new QueueImpl(queueName);
    }

    public Topic createTopic(String topicName) throws JMSException {
        this.verifyState();
        if (this.type == 1) {
            throw new IllegalStateException("Operation not allowed on this session type");
        }
        if (topicName == null) {
            throw new InvalidDestinationException("createTopic, topicName is null!");
        }
        return new TopicImpl(topicName);
    }

    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        this.verifyState();
        if (this.type == 2) {
            throw new IllegalStateException("Operation not allowed on this session type");
        }
        return this.createBrowser(queue, null);
    }

    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
        this.verifyState();
        if (this.type == 2) {
            throw new IllegalStateException("Operation not allowed on this session type");
        }
        if (queue == null) {
            throw new InvalidDestinationException("createBrowser, queue is null!");
        }
        QueueBrowserImpl queueBrowser = null;
        CreateBrowserReply reply = null;
        try {
            String ms = messageSelector;
            if (messageSelector != null && messageSelector.trim().length() == 0) {
                ms = null;
            }
            reply = (CreateBrowserReply)this.requestRegistry.request(new CreateBrowserRequest(this.dispatchId, (QueueImpl)queue, ms));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        queueBrowser = new QueueBrowserImpl(this, queue, messageSelector, this.dispatchId, reply.getQueueBrowserId(), this.requestRegistry);
        return queueBrowser;
    }

    public TemporaryQueue createTemporaryQueue() throws JMSException {
        this.verifyState();
        if (this.type == 2) {
            throw new IllegalStateException("Operation not allowed on this session type");
        }
        TemporaryQueueImpl tempQueue = null;
        CreateTmpQueueReply reply = null;
        try {
            reply = (CreateTmpQueueReply)this.requestRegistry.request(new CreateTmpQueueRequest());
            tempQueue = new TemporaryQueueImpl(reply.getQueueName(), this.myConnection);
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        return tempQueue;
    }

    public TemporaryTopic createTemporaryTopic() throws JMSException {
        this.verifyState();
        if (this.type == 1) {
            throw new IllegalStateException("Operation not allowed on this session type");
        }
        TemporaryTopicImpl tempTopic = null;
        CreateTmpQueueReply reply = null;
        try {
            reply = (CreateTmpQueueReply)this.requestRegistry.request(new CreateTmpQueueRequest());
            tempTopic = new TemporaryTopicImpl(reply.getQueueName(), this.myConnection);
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        return tempTopic;
    }

    public void unsubscribe(String name) throws JMSException {
        this.verifyState();
        if (this.type == 1) {
            throw new IllegalStateException("Operation not allowed on this session type");
        }
        if (name == null) {
            throw new NullPointerException("unsubscribe, name is null!");
        }
        DeleteDurableReply reply = null;
        try {
            reply = (DeleteDurableReply)this.requestRegistry.request(new DeleteDurableRequest(this.dispatchId, name));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
    }

    public int getAcknowledgeMode() throws JMSException {
        this.verifyState();
        return this.acknowledgeMode;
    }

    public BytesMessage createBytesMessage() throws JMSException {
        this.verifyState();
        return new BytesMessageImpl();
    }

    public MapMessage createMapMessage() throws JMSException {
        this.verifyState();
        return new MapMessageImpl();
    }

    public Message createMessage() throws JMSException {
        this.verifyState();
        return new MessageImpl();
    }

    public ObjectMessage createObjectMessage() throws JMSException {
        this.verifyState();
        return new ObjectMessageImpl();
    }

    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
        this.verifyState();
        ObjectMessage msg = this.createObjectMessage();
        msg.setObject(object);
        return msg;
    }

    public StreamMessage createStreamMessage() throws JMSException {
        this.verifyState();
        return new StreamMessageImpl();
    }

    public TextMessage createTextMessage() throws JMSException {
        this.verifyState();
        return new TextMessageImpl();
    }

    public TextMessage createTextMessage(String s) throws JMSException {
        this.verifyState();
        TextMessage msg = this.createTextMessage();
        msg.setText(new String(s));
        return msg;
    }

    public boolean getTransacted() throws JMSException {
        this.verifyState();
        return this.transacted;
    }

    public void commit() throws JMSException {
        this.verifyState();
        if (this.transacted) {
            CommitReply reply = null;
            try {
                reply = (CommitReply)this.requestTransaction(new CommitRequest(this.dispatchId));
            }
            catch (Exception e) {
                throw ExceptionConverter.convert(e);
            }
            if (!reply.isOk()) {
                throw ExceptionConverter.convert(reply.getException());
            }
            long delay = reply.getDelay();
            if (delay > 0L) {
                try {
                    Thread.sleep(delay);
                }
                catch (Exception exception) {}
            }
        } else {
            throw new IllegalStateException("Session is not transacted - commit not allowed");
        }
    }

    synchronized void startRecoverConsumers() {
        this.sessionQueue.stopQueue();
        this.recoveryInProgress = true;
        ++this.recoveryEpoche;
        Iterator iter = this.consumerMap.entrySet().iterator();
        while (iter.hasNext()) {
            MessageConsumerImpl c = (MessageConsumerImpl)iter.next().getValue();
            c.setWasRecovered(true);
            c.clearCache();
        }
        this.sessionQueue.clear();
    }

    synchronized void endRecoverConsumers() {
        this.recoveryInProgress = false;
        this.sessionQueue.startQueue();
    }

    public void rollback() throws JMSException {
        this.verifyState();
        if (this.transacted) {
            this.startRecoverConsumers();
            this.dropTransaction();
            Reply reply = null;
            try {
                reply = this.requestRegistry.request(new RollbackRequest(this.dispatchId));
            }
            catch (Exception e) {
                throw ExceptionConverter.convert(e);
            }
            if (!reply.isOk()) {
                throw ExceptionConverter.convert(reply.getException());
            }
        } else {
            throw new IllegalStateException("Session is not transacted - rollback not allowed");
        }
        this.endRecoverConsumers();
    }

    boolean isClosed() {
        return this.closed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws JMSException {
        if (this.ignoreClose || this.closed) {
            return;
        }
        this.sessionQueue.stopQueue();
        this.sessionQueue.clear();
        Reply reply = null;
        SessionImpl sessionImpl = this;
        synchronized (sessionImpl) {
            this.closed = true;
            Iterator iter = this.consumerMap.entrySet().iterator();
            while (iter.hasNext()) {
                MessageConsumerImpl consumer = (MessageConsumerImpl)iter.next().getValue();
                consumer.cancel();
            }
            this.consumerMap.clear();
            if (this.transacted) {
                this.dropTransaction();
            }
        }
        try {
            reply = this.requestRegistry.request(new CloseSessionRequest(this.dispatchId));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        this.myConnection.removeRequestService(this.myDispatchId);
        this.myConnection.removeSession(this);
        if (reply == null) {
            return;
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
    }

    void cancel() {
        this.closed = true;
        this.sessionQueue.stopQueue();
        this.sessionQueue.clear();
        Iterator iter = this.consumerMap.entrySet().iterator();
        while (iter.hasNext()) {
            MessageConsumerImpl consumer = (MessageConsumerImpl)iter.next().getValue();
            consumer.cancel();
        }
        this.consumerMap.clear();
        if (this.transacted) {
            this.dropTransaction();
        }
    }

    public synchronized void recover() throws JMSException {
        this.verifyState();
        if (!this.transacted) {
            this.startRecoverConsumers();
            Reply reply = null;
            try {
                reply = this.requestRegistry.request(new RecoverSessionRequest(this.dispatchId));
            }
            catch (Exception e) {
                throw ExceptionConverter.convert(e);
            }
            if (!reply.isOk()) {
                throw ExceptionConverter.convert(reply.getException());
            }
        } else {
            throw new IllegalStateException("Session is transacted - recover not allowed");
        }
        this.endRecoverConsumers();
    }

    public MessageListener getMessageListener() throws JMSException {
        this.verifyState();
        return this.messageListener;
    }

    public synchronized void setMessageListener(MessageListener messageListener) throws JMSException {
        this.verifyState();
        this.messageListener = messageListener;
    }

    boolean isShadowConsumerCreated() {
        return this.shadowConsumerCreated;
    }

    void createShadowConsumer(String queueName) throws Exception {
        Reply reply = this.requestRegistry.request(new CreateShadowConsumerRequest(this.dispatchId, queueName));
        if (!reply.isOk()) {
            throw reply.getException();
        }
        this.shadowConsumerCreated = true;
    }

    void addMessageEntry(MessageEntry messageEntry) {
        this.sessionQueue.stopQueue();
        if (this.messageChunk == null) {
            this.messageChunk = new RingBuffer(32);
        }
        this.messageChunk.add(messageEntry);
    }

    void assignLastMessage() throws Exception {
        Reply reply;
        if (this.lastMessage != null && !(reply = this.requestRegistry.request(new AssociateMessageRequest(this.dispatchId, this.lastMessage.getMessageIndex()))).isOk()) {
            throw reply.getException();
        }
    }

    void setAutoAssign(boolean autoAssign) {
        this.autoAssign = autoAssign;
    }

    @Override
    public boolean acknowledgeMessage(MessageIndex messageIndex) throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Connection is closed");
        }
        Reply reply = this.requestRegistry.request(new AcknowledgeMessageRequest(this.dispatchId, 0, messageIndex));
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        return false;
    }

    public synchronized void run() {
        if (this.closed) {
            return;
        }
        if (this.messageListener == null) {
            throw new RuntimeException("No MessageListener has been set!");
        }
        if (this.messageChunk != null) {
            while (this.messageChunk.getSize() > 0) {
                try {
                    this.lastMessage = (MessageEntry)this.messageChunk.remove();
                    this.lastMessage.moveMessageAttributes();
                    if (this.autoAssign) {
                        this.assignLastMessage();
                    }
                    this.lastMessage.getMessage().setSessionImpl(this);
                    this.lastMessage.getMessage().setReadOnly(true);
                    this.lastMessage.getMessage().reset();
                    this.lastMessage.getMessage().setUseThreadContextCL(this.useThreadContextCL);
                    this.messageListener.onMessage((Message)this.lastMessage.getMessage());
                    if (!this.transacted && this.acknowledgeMode != 2) {
                        this.acknowledgeMessage(this.lastMessage.getMessageIndex());
                    }
                }
                catch (Exception e) {
                    throw new RuntimeException(e.toString());
                }
                this.lastMessage = null;
            }
        }
    }

    private synchronized void doDeliverMessage(AsyncMessageDeliveryRequest request) {
        if (this.closed || this.recoveryInProgress || request.getRecoveryEpoche() < this.recoveryEpoche) {
            return;
        }
        int consumerId = request.getListenerId();
        MessageConsumerImpl consumer = (MessageConsumerImpl)this.consumerMap.get(new Integer(consumerId));
        if (consumer != null) {
            if (request.isBulk()) {
                consumer.addToCache(request.createRequests(), request.isRequiresRestart());
            } else {
                consumer.addToCache(request);
            }
        }
    }

    void triggerInvocation() {
        this.sessionQueue.triggerInvocation();
    }

    @Override
    public void serviceRequest(Request request) {
        this.sessionQueue.enqueue(request);
    }

    private class SessionTask
    implements AsyncTask {
        private SessionTask() {
        }

        @Override
        public boolean isValid() {
            return !SessionImpl.this.closed;
        }

        @Override
        public String getDispatchToken() {
            return SessionImpl.DISPATCH_TOKEN;
        }

        @Override
        public String getDescription() {
            return SessionImpl.this.myConnection.myHostname + "/Session/SessionTask";
        }

        @Override
        public void run() {
            if (!SessionImpl.this.closed && SessionImpl.this.sessionQueue.dequeue()) {
                SessionImpl.this.sessionPool.dispatchTask(this);
            }
        }

        @Override
        public void stop() {
        }
    }

    private class SessionDeliveryQueue
    extends SingleProcessorQueue {
        Object dummy;

        public SessionDeliveryQueue() {
            super(100);
            this.dummy = new Object();
        }

        @Override
        protected void startProcessor() {
            if (!SessionImpl.this.closed) {
                SessionImpl.this.sessionPool.dispatchTask(SessionImpl.this.sessionTask);
            }
        }

        void triggerInvocation() {
            this.enqueue(this.dummy);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void process(Object[] bulk, int n) {
            for (int i = 0; i < n; ++i) {
                if (bulk[i] == this.dummy) continue;
                SessionImpl.this.doDeliverMessage((AsyncMessageDeliveryRequest)bulk[i]);
            }
            boolean moreToDeliver = false;
            SessionImpl sessionImpl = SessionImpl.this;
            synchronized (sessionImpl) {
                if (SessionImpl.this.recoveryInProgress) {
                    return;
                }
                Iterator iter = SessionImpl.this.consumerMap.entrySet().iterator();
                while (iter.hasNext()) {
                    MessageConsumerImpl c = (MessageConsumerImpl)iter.next().getValue();
                    boolean b = c.invokeConsumer();
                    if (!b) continue;
                    moreToDeliver = true;
                }
            }
            if (moreToDeliver) {
                this.enqueue(this.dummy);
            }
        }
    }
}

