/*
 * Decompiled with CFR 0.152.
 */
package com.swiftmq.jms.v750;

import com.swiftmq.auth.ChallengeResponseFactory;
import com.swiftmq.client.thread.PoolManager;
import com.swiftmq.jms.ConnectionLostException;
import com.swiftmq.jms.DestinationImpl;
import com.swiftmq.jms.ExceptionConverter;
import com.swiftmq.jms.InvalidVersionException;
import com.swiftmq.jms.QueueImpl;
import com.swiftmq.jms.ReconnectListener;
import com.swiftmq.jms.SwiftMQConnection;
import com.swiftmq.jms.TopicImpl;
import com.swiftmq.jms.smqp.SMQPFactory;
import com.swiftmq.jms.smqp.SMQPVersionRequest;
import com.swiftmq.jms.smqp.v750.AuthResponseRequest;
import com.swiftmq.jms.smqp.v750.CreateSessionReply;
import com.swiftmq.jms.smqp.v750.CreateSessionRequest;
import com.swiftmq.jms.smqp.v750.DeleteTmpQueueRequest;
import com.swiftmq.jms.smqp.v750.DisconnectRequest;
import com.swiftmq.jms.smqp.v750.GetAuthChallengeReply;
import com.swiftmq.jms.smqp.v750.GetAuthChallengeRequest;
import com.swiftmq.jms.smqp.v750.GetClientIdReply;
import com.swiftmq.jms.smqp.v750.GetClientIdRequest;
import com.swiftmq.jms.smqp.v750.GetMetaDataReply;
import com.swiftmq.jms.smqp.v750.GetMetaDataRequest;
import com.swiftmq.jms.smqp.v750.KeepAliveRequest;
import com.swiftmq.jms.smqp.v750.SMQPBulkRequest;
import com.swiftmq.jms.smqp.v750.SetClientIdReply;
import com.swiftmq.jms.smqp.v750.SetClientIdRequest;
import com.swiftmq.jms.v750.ConnectionConsumerImpl;
import com.swiftmq.jms.v750.ConnectionMetaDataImpl;
import com.swiftmq.jms.v750.Connector;
import com.swiftmq.jms.v750.QueueConnectionConsumerImpl;
import com.swiftmq.jms.v750.RecreatableConnection;
import com.swiftmq.jms.v750.SessionImpl;
import com.swiftmq.jms.v750.TemporaryQueueRecreator;
import com.swiftmq.jms.v750.TopicConnectionConsumerImpl;
import com.swiftmq.jms.v750.po.POReconnect;
import com.swiftmq.net.client.ExceptionHandler;
import com.swiftmq.net.client.InboundHandler;
import com.swiftmq.net.client.Reconnector;
import com.swiftmq.swiftlet.threadpool.AsyncTask;
import com.swiftmq.swiftlet.threadpool.ThreadPool;
import com.swiftmq.tools.collection.ListSet;
import com.swiftmq.tools.concurrent.Semaphore;
import com.swiftmq.tools.dump.Dumpable;
import com.swiftmq.tools.dump.DumpableFactory;
import com.swiftmq.tools.dump.Dumpalizer;
import com.swiftmq.tools.queue.SingleProcessorQueue;
import com.swiftmq.tools.requestreply.Reply;
import com.swiftmq.tools.requestreply.ReplyHandler;
import com.swiftmq.tools.requestreply.Request;
import com.swiftmq.tools.requestreply.RequestHandler;
import com.swiftmq.tools.requestreply.RequestRegistry;
import com.swiftmq.tools.requestreply.RequestRetryValidator;
import com.swiftmq.tools.requestreply.RequestServiceRegistry;
import com.swiftmq.tools.requestreply.TransportException;
import com.swiftmq.tools.requestreply.ValidationException;
import com.swiftmq.tools.timer.TimerEvent;
import com.swiftmq.tools.timer.TimerListener;
import com.swiftmq.tools.timer.TimerRegistry;
import com.swiftmq.tools.util.DataStreamOutputStream;
import com.swiftmq.tools.util.LengthCaptureDataInput;
import com.swiftmq.util.SwiftUtilities;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Queue;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;

public class ConnectionImpl
extends RequestServiceRegistry
implements SwiftMQConnection,
Connection,
ReplyHandler,
RequestHandler,
TimerListener,
InboundHandler,
ExceptionHandler,
RecreatableConnection {
    public static final String DISPATCH_TOKEN = "sys$jms.client.connection.connectiontask";
    public static final int CLIENT_VERSION = 750;
    public static final int DISCONNECTED = -1;
    public static final int CONNECTED_STOPPED = 0;
    public static final int CONNECTED_STARTED = 1;
    public static final int INITIAL_KEEPALIVE_COUNT = Integer.parseInt(System.getProperty("swiftmq.jms.keepalive.missing.threshold", "5"));
    KeepAliveRequest keepaliveRequest = new KeepAliveRequest();
    boolean closed = false;
    int connectionState = -1;
    ConnectionMetaDataImpl metaData = null;
    String clientID = null;
    String internalCID = null;
    String originalCID = null;
    boolean clientIdAdministratively = false;
    String myHostname = null;
    ExceptionListener exceptionListener = null;
    RequestRegistry requestRegistry = null;
    List sessionList = Collections.synchronizedList(new ArrayList());
    List connectionConsumerList = Collections.synchronizedList(new ArrayList());
    Map tmpQueues = Collections.synchronizedMap(new HashMap());
    DumpableFactory dumpableFactory = new SMQPFactory(new com.swiftmq.jms.smqp.v750.SMQPFactory());
    boolean cancelled = false;
    boolean clientIdAllowed = true;
    ChallengeResponseFactory crFactory = null;
    String userName = null;
    String password = null;
    com.swiftmq.net.client.Connection connection = null;
    long keepaliveInterval = 0L;
    int smqpProducerReplyInterval = 0;
    int smqpConsumerCacheSize = 0;
    int smqpConsumerCacheSizeKB = -1;
    int jmsDeliveryMode = 0;
    int jmsPriority = 0;
    long jmsTTL = 0L;
    boolean jmsMessageIdEnabled = false;
    boolean jmsMessageTimestampEnabled = false;
    boolean useThreadContextCL = false;
    boolean duplicateMessageDetection = false;
    int duplicateBacklogSize = 500;
    ListSet duplicateLog = new ListSet(500);
    ConnectionQueue connectionQueue = null;
    ConnectionTask connectionTask = null;
    ThreadPool connectionPool = null;
    DataStreamOutputStream outStream = null;
    Reconnector reconnector = null;
    GetAuthChallengeReply authReply = null;
    Connector connector = null;
    List reconnectListeners = new ArrayList();
    boolean reconnectInProgress = false;
    volatile long lastConnectionLost = -1L;
    volatile int connectionId = -1;
    volatile int keepaliveCount = INITIAL_KEEPALIVE_COUNT;
    volatile JMSSecurityException lastSecurityException = null;
    volatile InvalidVersionException lastInvalidVersionException = null;
    AtomicBoolean inputActiveIndicator = null;

    protected ConnectionImpl(String userName, String password, Reconnector reconnector) throws JMSException {
        this.userName = userName;
        this.password = password;
        this.reconnector = reconnector;
        this.connector = new Connector(reconnector);
        this.reconnectAndWait();
        if (this.connection == null) {
            if (this.lastSecurityException != null) {
                throw this.lastSecurityException;
            }
            if (this.lastInvalidVersionException != null) {
                throw this.lastInvalidVersionException;
            }
            throw new JMSException("Unable to create a connection to: " + reconnector.getServers());
        }
        this.connectionState = 0;
        this.connectionPool = PoolManager.getInstance().getConnectionPool();
        this.connectionTask = new ConnectionTask();
        this.connectionQueue = new ConnectionQueue();
        this.connectionQueue.startQueue();
        this.requestRegistry = new RequestRegistry(this.toString());
        this.requestRegistry.setRequestTimeoutEnabled(!reconnector.isEnabled());
        this.requestRegistry.setRequestHandler(this);
        this.setReplyHandler(this);
    }

    @Override
    public boolean isReconnectEnabled() {
        return this.reconnector.isEnabled();
    }

    private void resetSessions(boolean reset) {
        int i;
        for (i = 0; i < this.sessionList.size(); ++i) {
            SessionImpl session = (SessionImpl)this.sessionList.get(i);
            session.setResetInProgress(reset);
            if (reset || this.connectionState != 1) continue;
            session.startSession();
        }
        for (i = 0; i < this.connectionConsumerList.size(); ++i) {
            ConnectionConsumerImpl cc = (ConnectionConsumerImpl)this.connectionConsumerList.get(i);
            cc.setResetInProgress(reset);
        }
    }

    private void setSessionBlockState(boolean blocked) {
        for (int i = 0; i < this.sessionList.size(); ++i) {
            SessionImpl session = (SessionImpl)this.sessionList.get(i);
            session.setBlocked(blocked);
        }
    }

    private void reconnect() {
        if (this.reconnector.isDebug()) {
            System.out.println(new Date() + " " + this.toString() + ": initiate reconnect...");
        }
        this.lastConnectionLost = System.currentTimeMillis();
        POReconnect po = new POReconnect(null, this);
        this.connector.dispatch(po);
        if (this.requestRegistry != null) {
            if (this.reconnector.isDebug()) {
                System.out.println(new Date() + " " + this.toString() + ": cancel: retry all requests");
            }
            this.requestRegistry.cancelRetryAllRequests();
        }
    }

    private void reconnectAndWait() {
        if (this.reconnector.isDebug()) {
            System.out.println(new Date() + " " + this.toString() + ": initiate reconnect and wait ...");
        }
        this.lastConnectionLost = System.currentTimeMillis();
        Semaphore sem = new Semaphore();
        POReconnect po = new POReconnect(sem, this);
        this.connector.dispatch(po);
        sem.waitHere();
        if (this.reconnector.isDebug()) {
            System.out.println(new Date() + " " + this.toString() + ": reconnect done, connection =" + this.connection);
        }
    }

    public long getLastConnectionLost() {
        return this.lastConnectionLost;
    }

    public int getConnectionId() {
        return this.connection == null ? -1 : this.connectionId;
    }

    @Override
    public void prepareForReconnect() {
        if (this.connectionQueue != null) {
            this.connectionQueue.stopQueue();
            this.connectionQueue.clear();
        }
        if (this.requestRegistry != null) {
            this.requestRegistry.setPaused(true);
        }
        this.resetSessions(true);
        if (this.connection != null) {
            this.reconnector.invalidateConnection();
            this.connection = null;
        }
    }

    @Override
    public Request getVersionRequest() {
        return new SMQPVersionRequest(750);
    }

    @Override
    public void setVersionReply(Reply reply) throws Exception {
        if (!reply.isOk()) {
            this.lastInvalidVersionException = new InvalidVersionException(reply.getException().getMessage());
            throw this.lastInvalidVersionException;
        }
    }

    @Override
    public Request getAuthenticateRequest() {
        return new GetAuthChallengeRequest(0, this.userName);
    }

    @Override
    public void setAuthenticateReply(Reply reply) throws Exception {
        if (!reply.isOk()) {
            this.lastSecurityException = new JMSSecurityException(reply.getException().getMessage());
            throw this.lastSecurityException;
        }
        this.lastSecurityException = null;
        this.authReply = (GetAuthChallengeReply)reply;
        this.crFactory = (ChallengeResponseFactory)Class.forName(this.authReply.getFactoryClass()).newInstance();
    }

    @Override
    public Request getAuthenticateResponse() {
        byte[] challenge = this.authReply.getChallenge();
        byte[] response = this.crFactory.createBytesResponse(challenge, this.password);
        return new AuthResponseRequest(0, response);
    }

    @Override
    public void setAuthenticateResponseReply(Reply reply) throws Exception {
        if (!reply.isOk()) {
            this.lastSecurityException = new JMSSecurityException(reply.getException().getMessage());
            throw this.lastSecurityException;
        }
        this.lastSecurityException = null;
    }

    @Override
    public Request getMetaDataRequest() {
        return new GetMetaDataRequest();
    }

    @Override
    public void setMetaDataReply(Reply reply) throws Exception {
        GetMetaDataReply mdReply = (GetMetaDataReply)reply;
        this.metaData = mdReply.getMetaData();
    }

    @Override
    public Request getGetClientIdRequest() {
        return this.internalCID == null ? null : new GetClientIdRequest(0);
    }

    @Override
    public void setGetClientIdReply(Reply reply) throws Exception {
        this.internalCID = ((GetClientIdReply)reply).getClientId();
    }

    @Override
    public Request getSetClientIdRequest() {
        if (this.clientID == null) {
            return null;
        }
        String s = this.clientID;
        if (this.clientID.indexOf(64) != -1) {
            s = this.clientID.substring(0, this.clientID.indexOf(64));
        }
        return new SetClientIdRequest(0, s);
    }

    @Override
    public void setSetClientIdReply(Reply reply) throws Exception {
        if (!reply.isOk()) {
            throw reply.getException();
        }
        this.clientID = ((SetClientIdReply)reply).getClientId();
    }

    @Override
    public Request getRecreateRequest() {
        return null;
    }

    @Override
    public void setRecreateReply(Reply reply) {
    }

    @Override
    public List getRecreatables() {
        int i;
        ArrayList<TemporaryQueueRecreator> list = new ArrayList<TemporaryQueueRecreator>();
        Iterator iter = this.tmpQueues.entrySet().iterator();
        while (iter.hasNext()) {
            list.add(new TemporaryQueueRecreator(this, (QueueImpl)iter.next().getValue()));
        }
        for (i = 0; i < this.sessionList.size(); ++i) {
            list.add((TemporaryQueueRecreator)this.sessionList.get(i));
        }
        for (i = 0; i < this.connectionConsumerList.size(); ++i) {
            list.add((TemporaryQueueRecreator)this.connectionConsumerList.get(i));
        }
        return list;
    }

    @Override
    public void handOver(com.swiftmq.net.client.Connection connection) {
        this.connection = connection;
        if (connection != null) {
            ++this.connectionId;
            this.myHostname = connection.getLocalHostname();
            connection.setInboundHandler(this);
            connection.setExceptionHandler(this);
            this.inputActiveIndicator = new AtomicBoolean(false);
            connection.setInputActiveIndicator(this.inputActiveIndicator);
            this.outStream = new DataStreamOutputStream(connection.getOutputStream());
            if (this.connectionQueue != null) {
                this.connectionQueue.clear();
                this.connectionQueue.startQueue();
            }
            this.setSessionBlockState(true);
            if (this.requestRegistry != null) {
                this.requestRegistry.setPaused(false);
                Semaphore retrySem = new Semaphore();
                this.requestRegistry.retryAllRequests(retrySem);
                retrySem.waitHere();
            }
            this.resetSessions(false);
            this.notifyReconnectListener();
            this.setSessionBlockState(false);
        }
    }

    private void notifyReconnectListener() {
        if (this.reconnectListeners.size() == 0) {
            return;
        }
        new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                List list = ConnectionImpl.this.reconnectListeners;
                synchronized (list) {
                    for (int i = 0; i < ConnectionImpl.this.reconnectListeners.size(); ++i) {
                        ((ReconnectListener)ConnectionImpl.this.reconnectListeners.get(i)).reconnected(ConnectionImpl.this.connection.getHostname(), ConnectionImpl.this.connection.getPort());
                    }
                }
            }
        }.start();
    }

    @Override
    public String getUserName() {
        return this.userName;
    }

    void assignClientId(String clientId) throws JMSException {
        this.clientID = clientId;
        this.originalCID = clientId;
        if (this.clientID != null) {
            this.clientIdAdministratively = true;
            try {
                SetClientIdReply reply = (SetClientIdReply)this.requestRegistry.request(new SetClientIdRequest(0, this.clientID));
                if (!reply.isOk()) {
                    throw new JMSException(reply.getException().toString());
                }
                this.clientID = reply.getClientId();
            }
            catch (Exception e) {
                if (e instanceof JMSException) {
                    throw (JMSException)((Object)e);
                }
                e.printStackTrace();
            }
        } else {
            this.clientIdAdministratively = false;
            try {
                GetClientIdReply reply = (GetClientIdReply)this.requestRegistry.request(new GetClientIdRequest());
                if (!reply.isOk()) {
                    throw new JMSException(reply.getException().toString());
                }
                this.internalCID = reply.getClientId();
            }
            catch (Exception e) {
                if (e instanceof JMSException) {
                    throw (JMSException)((Object)e);
                }
                e.printStackTrace();
            }
        }
    }

    public String getInternalCID() {
        return this.internalCID;
    }

    int getSmqpProducerReplyInterval() {
        return this.smqpProducerReplyInterval;
    }

    void setSmqpProducerReplyInterval(int smqpProducerReplyInterval) {
        this.smqpProducerReplyInterval = smqpProducerReplyInterval;
    }

    int getSmqpConsumerCacheSize() {
        return this.smqpConsumerCacheSize;
    }

    void setSmqpConsumerCacheSize(int smqpConsumerCacheSize) {
        this.smqpConsumerCacheSize = smqpConsumerCacheSize;
    }

    int getSmqpConsumerCacheSizeKB() {
        return this.smqpConsumerCacheSizeKB;
    }

    void setSmqpConsumerCacheSizeKB(int smqpConsumerCacheSizeKB) {
        this.smqpConsumerCacheSizeKB = smqpConsumerCacheSizeKB;
    }

    int getJmsDeliveryMode() {
        return this.jmsDeliveryMode;
    }

    void setJmsDeliveryMode(int jmsDeliveryMode) {
        this.jmsDeliveryMode = jmsDeliveryMode;
    }

    int getJmsPriority() {
        return this.jmsPriority;
    }

    void setJmsPriority(int jmsPriority) {
        this.jmsPriority = jmsPriority;
    }

    long getJmsTTL() {
        return this.jmsTTL;
    }

    void setJmsTTL(long jmsTTL) {
        this.jmsTTL = jmsTTL;
    }

    boolean isJmsMessageIdEnabled() {
        return this.jmsMessageIdEnabled;
    }

    void setJmsMessageIdEnabled(boolean jmsMessageIdEnabled) {
        this.jmsMessageIdEnabled = jmsMessageIdEnabled;
    }

    boolean isJmsMessageTimestampEnabled() {
        return this.jmsMessageTimestampEnabled;
    }

    void setJmsMessageTimestampEnabled(boolean jmsMessageTimestampEnabled) {
        this.jmsMessageTimestampEnabled = jmsMessageTimestampEnabled;
    }

    boolean isUseThreadContextCL() {
        return this.useThreadContextCL;
    }

    void setUseThreadContextCL(boolean useThreadContextCL) {
        this.useThreadContextCL = useThreadContextCL;
    }

    public boolean isDuplicateMessageDetection() {
        return this.duplicateMessageDetection;
    }

    public void setDuplicateMessageDetection(boolean duplicateMessageDetection) {
        this.duplicateMessageDetection = duplicateMessageDetection;
    }

    public synchronized void setDuplicateBacklogSize(int duplicateBacklogSize) {
        this.duplicateBacklogSize = duplicateBacklogSize;
        this.duplicateLog.resize(duplicateBacklogSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addReconnectListener(ReconnectListener listener) {
        List list = this.reconnectListeners;
        synchronized (list) {
            this.reconnectListeners.add(listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeReconnectListener(ReconnectListener listener) {
        List list = this.reconnectListeners;
        synchronized (list) {
            this.reconnectListeners.remove(listener);
        }
    }

    void startKeepAlive(long keepaliveInterval) {
        this.keepaliveInterval = keepaliveInterval;
        TimerRegistry.Singleton().addTimerListener(keepaliveInterval, (TimerListener)this);
    }

    @Override
    public void performTimeAction(TimerEvent evt) {
        if (this.connection != null) {
            boolean wasActive = this.inputActiveIndicator.getAndSet(false);
            if (wasActive) {
                this.keepaliveCount = INITIAL_KEEPALIVE_COUNT;
                this.performRequest(this.keepaliveRequest);
                if (this.reconnector.isDebug()) {
                    System.out.println(new Date() + " " + this.toString() + ": inputActiveIndicator was true, reset keepalive counter to " + INITIAL_KEEPALIVE_COUNT);
                }
            } else {
                --this.keepaliveCount;
                if (this.reconnector.isDebug()) {
                    System.out.println(new Date() + " " + this.toString() + ": decrementing keepalive counter to " + this.keepaliveCount);
                }
                if (this.keepaliveCount <= 0) {
                    if (this.reconnector.isDebug()) {
                        System.out.println(new Date() + " " + this.toString() + ": keepalive counter reaches 0, invalidating connection!");
                    }
                    if (this.reconnector.isEnabled()) {
                        this.keepaliveCount = INITIAL_KEEPALIVE_COUNT;
                        this.reconnect();
                    } else {
                        this.cancelAndNotify(new Exception("Keepalive Counter reaches 0!"), true);
                    }
                } else {
                    this.performRequest(this.keepaliveRequest);
                }
            }
        }
    }

    protected void verifyState() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Connection is closed");
        }
    }

    synchronized void increaseDuplicateLogSize(int extend) {
        this.duplicateLog.resize(this.duplicateLog.getMax() + extend);
    }

    synchronized void decreaseDuplicateLogSize(int extend) {
        this.duplicateLog.resize(Math.max(500, this.duplicateLog.getMax() - extend));
    }

    synchronized void addToDuplicateLog(Set rollbackLog) {
        this.duplicateLog.addAll(rollbackLog);
    }

    synchronized boolean isDuplicate(String id) {
        boolean rc = false;
        if (this.duplicateLog.contains(id)) {
            rc = true;
        } else {
            this.duplicateLog.add(id);
        }
        return rc;
    }

    synchronized void removeFromDuplicateLog(String id) {
        this.duplicateLog.remove(id);
    }

    void addSession(Session session) {
        this.clientIdAllowed = false;
        this.sessionList.add(session);
        if (this.connectionState == 1) {
            ((SessionImpl)session).startSession();
        }
    }

    void removeSession(Session session) {
        this.sessionList.remove(session);
    }

    void addConnectionConsumer(ConnectionConsumerImpl connectionConsumer) {
        this.connectionConsumerList.add(connectionConsumer);
        if (this.connectionState == 1) {
            connectionConsumer.startConsumer();
        }
    }

    void removeConnectionConsumer(ConnectionConsumerImpl connectionConsumer) {
        this.connectionConsumerList.remove(connectionConsumer);
    }

    void addTmpQueue(QueueImpl queue) {
        try {
            this.tmpQueues.put(queue.getQueueName(), queue);
        }
        catch (JMSException jMSException) {
            // empty catch block
        }
    }

    void removeTmpQueue(String queueName) {
        this.tmpQueues.remove(queueName);
    }

    public int getConnectionState() {
        return this.connectionState;
    }

    @Override
    public void deleteTempQueue(String queueName) throws JMSException {
        Reply reply = null;
        try {
            reply = this.requestRegistry.request(new DeleteTmpQueueRequest(0, queueName));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        this.tmpQueues.remove(queueName);
    }

    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        this.verifyState();
        SessionImpl session = null;
        CreateSessionReply reply = null;
        try {
            reply = (CreateSessionReply)this.requestRegistry.request(new CreateSessionRequest(0, transacted, acknowledgeMode, 2, 0));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        int dispatchId = reply.getSessionDispatchId();
        String cid = this.clientID != null ? this.clientID : this.internalCID;
        session = new SessionImpl(0, this, transacted, acknowledgeMode, dispatchId, this.requestRegistry, this.myHostname, cid);
        session.setUserName(this.getUserName());
        session.setMyDispatchId(this.addRequestService(session));
        this.addSession(session);
        return session;
    }

    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        String ms;
        this.verifyState();
        QueueConnectionConsumerImpl cc = null;
        CreateSessionReply reply = null;
        try {
            reply = (CreateSessionReply)this.requestRegistry.request(new CreateSessionRequest(0, false, 0, 0, 0));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (reply.isOk()) {
            int dispatchId = reply.getSessionDispatchId();
            cc = new QueueConnectionConsumerImpl(this, dispatchId, this.requestRegistry, sessionPool, maxMessages);
            cc.setMyDispatchId(this.addRequestService(cc));
            ms = messageSelector;
            if (messageSelector != null && messageSelector.trim().length() == 0) {
                ms = null;
            }
        } else {
            throw ExceptionConverter.convert(reply.getException());
        }
        cc.createConsumer((QueueImpl)queue, ms);
        this.addConnectionConsumer(cc);
        return cc;
    }

    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        String ms;
        this.verifyState();
        TopicConnectionConsumerImpl cc = null;
        CreateSessionReply reply = null;
        try {
            reply = (CreateSessionReply)this.requestRegistry.request(new CreateSessionRequest(0, false, 0, 1, 0));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (reply.isOk()) {
            int dispatchId = reply.getSessionDispatchId();
            cc = new TopicConnectionConsumerImpl(this, dispatchId, this.requestRegistry, sessionPool, maxMessages);
            cc.setMyDispatchId(this.addRequestService(cc));
            ms = messageSelector;
            if (messageSelector != null && messageSelector.trim().length() == 0) {
                ms = null;
            }
        } else {
            throw ExceptionConverter.convert(reply.getException());
        }
        cc.createSubscriber((TopicImpl)topic, ms);
        this.addConnectionConsumer(cc);
        return cc;
    }

    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        this.verifyState();
        TopicConnectionConsumerImpl cc = null;
        CreateSessionReply reply = null;
        try {
            reply = (CreateSessionReply)this.requestRegistry.request(new CreateSessionRequest(0, false, 0, 1, 0));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        int dispatchId = reply.getSessionDispatchId();
        cc = new TopicConnectionConsumerImpl(this, dispatchId, this.requestRegistry, sessionPool, maxMessages);
        cc.setMyDispatchId(this.addRequestService(cc));
        cc.createDurableSubscriber((TopicImpl)topic, messageSelector, subscriptionName);
        this.addConnectionConsumer(cc);
        return cc;
    }

    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("createConnectionConsumer, destination is null!");
        }
        DestinationImpl destImpl = (DestinationImpl)destination;
        ConnectionConsumer consumer = null;
        switch (destImpl.getType()) {
            case 0: 
            case 3: {
                consumer = this.createConnectionConsumer((Queue)destination, messageSelector, sessionPool, maxMessages);
                break;
            }
            case 1: 
            case 2: {
                consumer = this.createConnectionConsumer((Topic)destination, messageSelector, sessionPool, maxMessages);
            }
        }
        return consumer;
    }

    public String getClientID() throws JMSException {
        this.verifyState();
        return this.originalCID;
    }

    public void setClientID(String s) throws JMSException {
        this.verifyState();
        if (this.clientIdAdministratively) {
            throw new IllegalStateException("Client ID was set administratively and cannot be changed");
        }
        if (!this.clientIdAllowed) {
            throw new IllegalStateException("setClientID is only allowed immediatly after connection creation");
        }
        this.clientIdAllowed = false;
        try {
            SwiftUtilities.verifyClientId(s);
        }
        catch (Exception e) {
            throw new InvalidClientIDException(e.getMessage());
        }
        SetClientIdReply reply = null;
        try {
            reply = (SetClientIdReply)this.requestRegistry.request(new SetClientIdRequest(0, s));
        }
        catch (Exception e) {
            throw new JMSException(e.getMessage());
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        this.clientID = reply.getClientId();
        this.originalCID = s;
    }

    public ConnectionMetaData getMetaData() throws JMSException {
        this.verifyState();
        return this.metaData;
    }

    public ExceptionListener getExceptionListener() throws JMSException {
        this.verifyState();
        return this.exceptionListener;
    }

    public void setExceptionListener(ExceptionListener listener) throws JMSException {
        if (listener != null) {
            this.verifyState();
        }
        this.exceptionListener = listener;
    }

    private void writeObject(Dumpable obj) throws Exception {
        try {
            Dumpalizer.dump(this.outStream, obj);
            this.outStream.flush();
        }
        catch (IOException e) {
            if (this.reconnector.isDebug()) {
                System.out.println(new Date() + " " + this.toString() + ": writeObject, exception= " + e);
            }
            throw e;
        }
    }

    @Override
    public void performRequest(Request request) {
        int id = request.getConnectionId();
        RequestRetryValidator validator = request.getValidator();
        if (id != -1 && id != this.connectionId && validator != null) {
            try {
                validator.validate(request);
            }
            catch (ValidationException e) {
                return;
            }
        }
        this.connectionQueue.enqueue(request);
    }

    @Override
    public void performReply(Reply reply) {
        this.connectionQueue.enqueue(reply);
    }

    @Override
    public void onException(IOException exception) {
        if (this.closed) {
            return;
        }
        if (this.reconnector.isDebug()) {
            System.out.println(new Date() + " " + this.toString() + ": onException, exception= " + exception);
        }
        if (this.reconnector.isEnabled()) {
            this.reconnect();
        } else {
            this.cancelAndNotify(exception, true);
        }
    }

    private void dispatchDumpable(Dumpable obj) {
        if (obj.getDumpId() != 152) {
            if (obj instanceof Reply) {
                this.requestRegistry.setReply((Reply)obj);
            } else if (obj instanceof Request) {
                Request req = (Request)obj;
                req.setConnectionId(this.connectionId);
                this.dispatch(req);
            }
        } else {
            this.keepaliveCount = INITIAL_KEEPALIVE_COUNT;
            if (this.reconnector.isDebug()) {
                System.out.println(new Date() + " " + this.toString() + ": setting keepalive counter to " + this.keepaliveCount);
            }
        }
    }

    @Override
    public void dataAvailable(LengthCaptureDataInput in) {
        try {
            Dumpable obj = Dumpalizer.construct(in, this.dumpableFactory);
            if (obj == null) {
                return;
            }
            if (obj.getDumpId() == 100) {
                SMQPBulkRequest bulkRequest = (SMQPBulkRequest)obj;
                for (int i = 0; i < bulkRequest.len; ++i) {
                    if (this.connection == null) {
                        return;
                    }
                    this.dispatchDumpable((Dumpable)bulkRequest.dumpables[i]);
                }
            } else {
                if (this.connection == null) {
                    return;
                }
                this.dispatchDumpable(obj);
            }
        }
        catch (Exception e) {
            if (this.reconnector.isDebug()) {
                System.out.println(new Date() + " " + this.toString() + ": dataAvailable, exception= " + e);
            }
            if (this.closed) {
                return;
            }
            if (this.reconnector.isEnabled()) {
                this.reconnect();
            }
            this.cancelAndNotify(e, true);
        }
    }

    public synchronized void start() throws JMSException {
        this.verifyState();
        this.clientIdAllowed = false;
        if (this.connectionState == 0) {
            int i;
            for (i = 0; i < this.sessionList.size(); ++i) {
                ((SessionImpl)this.sessionList.get(i)).startSession();
            }
            for (i = 0; i < this.connectionConsumerList.size(); ++i) {
                ((ConnectionConsumerImpl)this.connectionConsumerList.get(i)).startConsumer();
            }
            this.connectionState = 1;
        } else if (this.connectionState == -1) {
            throw new IllegalStateException("could not start - connection is disconnected!");
        }
    }

    public void stop() throws JMSException {
        this.verifyState();
        this.clientIdAllowed = false;
        if (this.connectionState == 1) {
            int i;
            for (i = 0; i < this.sessionList.size(); ++i) {
                ((SessionImpl)this.sessionList.get(i)).stopSession();
            }
            for (i = 0; i < this.connectionConsumerList.size(); ++i) {
                ((ConnectionConsumerImpl)this.connectionConsumerList.get(i)).stopConsumer();
            }
            this.connectionState = 0;
        } else if (this.connectionState == -1) {
            throw new IllegalStateException("could not stop - connection is disconnected!");
        }
    }

    public void close() throws JMSException {
        if (this.closed) {
            return;
        }
        if (this.connectionState == -1) {
            throw new IllegalStateException("could not close - connection is disconnected!");
        }
        try {
            TimerRegistry.Singleton().removeTimerListener(this.keepaliveInterval, (TimerListener)this);
            this.setExceptionListener(null);
            SessionImpl[] si = this.sessionList.toArray(new SessionImpl[this.sessionList.size()]);
            for (int i = 0; i < si.length; ++i) {
                SessionImpl session = si[i];
                if (session.isClosed()) continue;
                session.close();
            }
            ConnectionConsumerImpl[] ci = this.connectionConsumerList.toArray(new ConnectionConsumerImpl[this.connectionConsumerList.size()]);
            for (int i = 0; i < ci.length; ++i) {
                ConnectionConsumerImpl cc = ci[i];
                if (cc.isClosed()) continue;
                cc.close();
            }
            this.requestRegistry.request(new DisconnectRequest());
            this.connector.close();
            this.closed = true;
            this.connectionQueue.stopQueue();
            this.reconnector.invalidateConnection();
            this.requestRegistry.cancelAllRequests(new TransportException("Connection closed"), false);
            this.requestRegistry.close();
            this.sessionList.clear();
            this.tmpQueues.clear();
            this.duplicateLog.clear();
            this.connectionState = -1;
        }
        catch (Exception e) {
            throw new JMSException(e.getMessage());
        }
    }

    public void cancel(boolean closeReconnector) {
        if (!this.cancelled) {
            int i;
            if (closeReconnector) {
                this.connector.close();
            }
            if (this.connectionQueue != null) {
                this.connectionQueue.stopQueue();
            }
            this.cancelled = true;
            this.closed = true;
            for (i = 0; i < this.sessionList.size(); ++i) {
                SessionImpl session = (SessionImpl)this.sessionList.get(i);
                session.cancel();
            }
            this.sessionList.clear();
            for (i = 0; i < this.connectionConsumerList.size(); ++i) {
                ConnectionConsumerImpl cc = (ConnectionConsumerImpl)this.connectionConsumerList.get(i);
                cc.cancel();
            }
            this.connectionConsumerList.clear();
            TimerRegistry.Singleton().removeTimerListener(this.keepaliveInterval, (TimerListener)this);
            this.reconnector.invalidateConnection();
        }
        if (this.requestRegistry != null) {
            this.requestRegistry.cancelAllRequests(new TransportException("Connection closed"), false);
            this.requestRegistry.close();
        }
        this.tmpQueues.clear();
        this.duplicateLog.clear();
        this.connectionState = -1;
    }

    @Override
    public void cancelAndNotify(Exception exception, boolean closeReconnector) {
        this.cancel(closeReconnector);
        if (this.exceptionListener != null) {
            this.exceptionListener.onException((JMSException)new ConnectionLostException(exception.getMessage()));
            this.exceptionListener = null;
        }
    }

    private class ConnectionTask
    implements AsyncTask {
        private ConnectionTask() {
        }

        @Override
        public boolean isValid() {
            return !ConnectionImpl.this.closed;
        }

        @Override
        public String getDispatchToken() {
            return ConnectionImpl.DISPATCH_TOKEN;
        }

        @Override
        public String getDescription() {
            return ConnectionImpl.this.myHostname + "/Connection/ConnectionTask";
        }

        @Override
        public void run() {
            if (!ConnectionImpl.this.closed && ConnectionImpl.this.connectionQueue.dequeue()) {
                ConnectionImpl.this.connectionPool.dispatchTask(this);
            }
        }

        @Override
        public void stop() {
        }
    }

    private class ConnectionQueue
    extends SingleProcessorQueue {
        SMQPBulkRequest bulkRequest;

        public ConnectionQueue() {
            super(100);
            this.bulkRequest = new SMQPBulkRequest();
        }

        @Override
        protected void startProcessor() {
            if (!ConnectionImpl.this.closed) {
                ConnectionImpl.this.connectionPool.dispatchTask(ConnectionImpl.this.connectionTask);
            }
        }

        @Override
        protected void process(Object[] bulk, int n) {
            try {
                if (n == 1) {
                    ConnectionImpl.this.writeObject((Dumpable)bulk[0]);
                } else {
                    this.bulkRequest.dumpables = bulk;
                    this.bulkRequest.len = n;
                    ConnectionImpl.this.writeObject(this.bulkRequest);
                }
            }
            catch (Exception e) {
                if (ConnectionImpl.this.reconnector.isEnabled()) {
                    ConnectionImpl.this.reconnect();
                }
                ConnectionImpl.this.cancelAndNotify(e, true);
            }
        }
    }
}

