/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client;

import io.nats.client.AsyncSubscription;
import io.nats.client.AsyncSubscriptionImpl;
import io.nats.client.Channel;
import io.nats.client.ClosedCallback;
import io.nats.client.Connection;
import io.nats.client.ConnectionEvent;
import io.nats.client.Constants;
import io.nats.client.DisconnectedCallback;
import io.nats.client.ExceptionHandler;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.MsgArg;
import io.nats.client.NATSException;
import io.nats.client.NATSThread;
import io.nats.client.NATSThreadFactory;
import io.nats.client.Options;
import io.nats.client.Parser;
import io.nats.client.ReconnectedCallback;
import io.nats.client.ServerInfo;
import io.nats.client.Statistics;
import io.nats.client.Subscription;
import io.nats.client.SubscriptionImpl;
import io.nats.client.SyncSubscription;
import io.nats.client.SyncSubscriptionImpl;
import io.nats.client.TCPConnection;
import io.nats.client.Utilities;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConnectionImpl
implements Connection {
    final Logger logger = LoggerFactory.getLogger(ConnectionImpl.class);
    String version = null;
    private static final String inboxPrefix = "_INBOX.";
    public Constants.ConnState status = Constants.ConnState.DISCONNECTED;
    protected static final String STALE_CONNECTION = "Stale Connection";
    protected static final String THREAD_POOL = "natsthreadpool";
    protected static final String LANG_STRING = "java";
    protected static final int DEFAULT_BUF_SIZE = 65536;
    protected static final int DEFAULT_STREAM_BUF_SIZE = 8192;
    protected static final int DEFAULT_PENDING_SIZE = 0x100000;
    protected static final int FLUSH_CHAN_SIZE = 1024;
    public static final String _CRLF_ = "\r\n";
    public static final String _EMPTY_ = "";
    public static final String _SPC_ = " ";
    public static final String _PUB_P_ = "PUB ";
    public static final String _OK_OP_ = "+OK";
    public static final String _ERR_OP_ = "-ERR";
    public static final String _MSG_OP_ = "MSG";
    public static final String _PING_OP_ = "PING";
    public static final String _PONG_OP_ = "PONG";
    public static final String _INFO_OP_ = "INFO";
    public static final String CONN_PROTO = "CONNECT %s\r\n";
    public static final String PING_PROTO = "PING\r\n";
    public static final String PONG_PROTO = "PONG\r\n";
    public static final String PUB_PROTO = "PUB %s %s %d\r\n";
    public static final String SUB_PROTO = "SUB %s %s %d\r\n";
    public static final String UNSUB_PROTO = "UNSUB %d %s\r\n";
    public static final String OK_PROTO = "+OK\r\n";
    protected final Lock mu = new ReentrantLock();
    private AtomicLong sidCounter = new AtomicLong();
    private URI url = null;
    protected Options opts = null;
    private TCPConnection conn = null;
    byte[] pubProtoBuf = null;
    private BufferedOutputStream bw = null;
    private BufferedInputStream br = null;
    private ByteArrayOutputStream pending = null;
    private ReentrantLock flusherLock = new ReentrantLock();
    private boolean flusherDone = false;
    private Map<Long, SubscriptionImpl> subs = new ConcurrentHashMap<Long, SubscriptionImpl>();
    protected List<Srv> srvPool = null;
    private Exception lastEx = null;
    private ServerInfo info = null;
    private int pout;
    protected Parser ps = null;
    protected MsgArg msgArgs = null;
    protected byte[] pingProtoBytes = null;
    protected int pingProtoBytesLen = 0;
    protected byte[] pongProtoBytes = null;
    protected int pongProtoBytesLen = 0;
    protected byte[] pubPrimBytes = null;
    protected int pubPrimBytesLen = 0;
    protected byte[] crlfProtoBytes = null;
    protected int crlfProtoBytesLen = 0;
    protected Statistics stats = null;
    private Queue<Channel<Boolean>> pongs = new LinkedBlockingQueue<Channel<Boolean>>();
    private ExecutorService executor = Executors.newCachedThreadPool(new NATSThreadFactory("natsthreadpool"));
    private ScheduledExecutorService ptmr = null;
    private Random r = null;
    private Phaser phaser = new Phaser();
    private Channel<Boolean> fch = new Channel();

    ConnectionImpl(Options o) {
        this(o, null);
    }

    ConnectionImpl(Options o, TCPConnection tcpconn) {
        Properties props = this.getProperties("jnats.properties");
        this.version = props.getProperty("client.version");
        this.opts = o;
        this.stats = new Statistics();
        this.ps = new Parser(this);
        this.msgArgs = new MsgArg();
        this.conn = tcpconn != null ? tcpconn : new TCPConnection();
        this.sidCounter.set(0L);
        this.pingProtoBytes = PING_PROTO.getBytes();
        this.pingProtoBytesLen = this.pingProtoBytes.length;
        this.pongProtoBytes = PONG_PROTO.getBytes();
        this.pongProtoBytesLen = this.pongProtoBytes.length;
        this.pubPrimBytes = _PUB_P_.getBytes();
        this.pubPrimBytesLen = this.pubPrimBytes.length;
        this.crlfProtoBytes = _CRLF_.getBytes();
        this.crlfProtoBytesLen = this.crlfProtoBytes.length;
        this.buildPublishProtocolBuffer(1024);
        this.setupServerPool();
    }

    private void setup() {
        this.subs.clear();
        this.pongs.clear();
    }

    protected Properties getProperties(InputStream inputStream) {
        Properties rv = new Properties();
        try {
            if (inputStream == null) {
                rv = null;
            } else {
                rv.load(inputStream);
            }
        }
        catch (IOException e) {
            rv = null;
        }
        return rv;
    }

    protected Properties getProperties(String resourceName) {
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(resourceName);
        return this.getProperties(is);
    }

    private void buildPublishProtocolBuffer(int size) {
        this.pubProtoBuf = new byte[size];
        System.arraycopy(this.pubPrimBytes, 0, this.pubProtoBuf, 0, this.pubPrimBytesLen);
    }

    protected void setupServerPool() {
        URI url = this.opts.getUrl();
        List<URI> servers = this.opts.getServers();
        this.srvPool = new ArrayList<Srv>();
        if (url != null) {
            this.srvPool.add(new Srv(url));
        }
        if (servers != null) {
            if (!this.opts.isNoRandomize()) {
                Collections.shuffle(servers, new Random(System.nanoTime()));
            }
            for (URI s : servers) {
                this.srvPool.add(new Srv(s));
            }
        }
        if (this.srvPool.isEmpty()) {
            this.srvPool.add(new Srv(URI.create("nats://localhost:4222")));
        }
        this.url = this.srvPool.get((int)0).url;
    }

    protected Srv currentServer() {
        Srv rv = null;
        for (Srv s : this.srvPool) {
            if (!s.url.equals(this.url)) continue;
            rv = s;
            break;
        }
        return rv;
    }

    protected Srv selectNextServer() throws IOException {
        this.logger.trace("In selectNextServer()");
        Srv s = this.currentServer();
        if (s == null) {
            throw new IOException("nats: No servers available for connection");
        }
        this.logger.trace("selectNextServer, removing {}", (Object)s);
        this.srvPool.remove(s);
        int maxReconnect = this.opts.getMaxReconnect();
        if (maxReconnect < 0 || s.reconnects < maxReconnect) {
            this.logger.trace("selectNextServer: maxReconnect: {}", (Object)maxReconnect);
            this.logger.trace("selectNextServer adding {}", (Object)s);
            this.srvPool.add(s);
        }
        if (this.srvPool.isEmpty()) {
            this.url = null;
            throw new IOException("nats: No servers available for connection");
        }
        this.url = this.srvPool.get((int)0).url;
        return this.srvPool.get(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connect() throws IOException, TimeoutException {
        this.mu.lock();
        try {
            Exception e;
            for (Srv s : this.srvPool) {
                this.url = s.url;
                try {
                    this.logger.debug("Connecting to {}", (Object)this.url);
                    this.createConn();
                    this.logger.debug("Connected to {}", (Object)this.url);
                    this.setup();
                    try {
                        this.processConnectInit();
                        this.logger.trace("connect() Resetting reconnects for {}", (Object)s);
                        s.reconnects = 0;
                        break;
                    }
                    catch (IOException e2) {
                        this.logger.trace("{} Exception: {}", (Object)this.url, (Object)e2.getMessage());
                        this.setLastError(e2);
                        this.mu.unlock();
                        this.close(Constants.ConnState.DISCONNECTED, false);
                        this.mu.lock();
                        this.url = null;
                    }
                }
                catch (IOException e3) {
                    if (!(e3 instanceof SocketException) || e3.getMessage() == null || !e3.getMessage().contains("Connection refused")) continue;
                    this.setLastError(null);
                }
            }
            if (this.getLastException() == null && this.status != Constants.ConnState.CONNECTED) {
                this.setLastError(new IOException("nats: No servers available for connection"));
            }
            if ((e = this.getLastException()) != null) {
                if (e instanceof IOException) {
                    throw (IOException)e;
                }
                if (e instanceof TimeoutException) {
                    throw (TimeoutException)e;
                }
                throw new Error("Unexpected error", e);
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void createConn() throws IOException {
        Srv s = this.currentServer();
        if (s == null) {
            throw new IOException("nats: No servers available for connection");
        }
        s.updateLastAttempt();
        this.logger.trace("createConn(): {}", (Object)s.url);
        try {
            this.logger.trace("Opening {}", (Object)s.url);
            if (this.conn == null) {
                this.conn = new TCPConnection();
            }
            this.conn.open(s.url.getHost(), s.url.getPort(), this.opts.getConnectionTimeout());
            this.logger.trace("Opened {}", (Object)s.url);
        }
        catch (IOException e) {
            this.logger.trace("Couldn't establish connection to {}: {}", (Object)s.url, (Object)e.getMessage());
            throw e;
        }
        if (this.pending != null && this.bw != null) {
            this.logger.trace("Flushing old outputstream to pending");
            try {
                this.bw.flush();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.bw = this.conn.getBufferedOutputStream(8192);
        this.br = this.conn.getBufferedInputStream(8192);
    }

    private void clearPendingFlushCalls() {
        this.logger.trace("clearPendingFlushCalls()");
        this.mu.lock();
        try {
            this.logger.trace("clearPendingFlushCalls(): acquired lock");
            for (Channel channel : this.pongs) {
                if (channel == null) continue;
                channel.add(true);
                this.logger.trace("Cleared PONG");
            }
            this.pongs.clear();
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public void close() {
        this.close(Constants.ConnState.CLOSED, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(Constants.ConnState closeState, boolean doCBs) {
        this.logger.trace("close({}, {})", (Object)closeState, (Object)String.valueOf(doCBs));
        final ConnectionImpl nc = this;
        this.logger.trace("Acquiring lock");
        this.mu.lock();
        this.logger.trace("Acquired lock");
        if (this.isClosed()) {
            this.logger.trace("ALREADY CLOSED, WE'RE DONE");
            this.status = closeState;
            this.mu.unlock();
            return;
        }
        this.status = Constants.ConnState.CLOSED;
        this.logger.trace("Kicking flusher");
        this.kickFlusher();
        this.mu.unlock();
        this.logger.trace("Kicked flusher");
        this.clearPendingFlushCalls();
        this.mu.lock();
        if (this.ptmr != null) {
            this.ptmr.shutdownNow();
        }
        if (this.conn != null) {
            try {
                this.bw.flush();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        this.logger.trace("Closing subscriptions");
        for (Long key : this.subs.keySet()) {
            SubscriptionImpl s = this.subs.get(key);
            s.mu.lock();
            s.closeChannel();
            s.closed = true;
            s.connClosed = true;
            s.mu.unlock();
        }
        this.subs.clear();
        final DisconnectedCallback dcb = this.opts.getDisconnectedCallback();
        if (doCBs && this.conn != null && dcb != null) {
            this.logger.trace("Calling disconnectedCB from Connection.close()");
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    Thread.currentThread().setName("disconnect-callback");
                    dcb.onDisconnect(new ConnectionEvent(nc));
                }
            });
            this.logger.trace("Spawned disconnectedCB from Connection.close()");
        }
        ClosedCallback ccb = this.opts.getClosedCallback();
        this.mu.unlock();
        if (doCBs && ccb != null) {
            this.logger.trace("In close(), calling ccb, doCBs={}, ccb={}", (Object)doCBs, (Object)ccb);
            ccb.onClose(new ConnectionEvent(this));
            this.logger.trace("In close(), ccb executed");
        }
        this.mu.lock();
        try {
            this.status = closeState;
        }
        finally {
            try {
                this.conn.close();
            }
            catch (Exception exception) {}
            this.mu.unlock();
        }
    }

    protected void processConnectInit() throws IOException {
        this.logger.trace("processConnectInit(): {}", (Object)this.url);
        this.status = Constants.ConnState.CONNECTING;
        this.processExpectedInfo();
        this.sendConnect();
        this.pout = 0;
        this.spinUpSocketWatchers();
    }

    private void checkForSecure() throws IOException {
        if (this.opts.isSecure() && !this.info.isTlsRequired()) {
            throw new IOException("nats: Secure Connection not available");
        }
        if (this.info.isTlsRequired() && !this.opts.isSecure()) {
            throw new IOException("nats: Secure Connection required");
        }
        if (this.opts.isSecure()) {
            this.makeTLSConn();
        }
    }

    private void makeTLSConn() throws IOException {
        this.conn.setTlsDebug(this.opts.isTlsDebug());
        this.conn.makeTLS(this.opts.getSslContext());
        this.bw = this.conn.getBufferedOutputStream(65536);
        this.br = this.conn.getBufferedInputStream(65536);
    }

    protected void processExpectedInfo() throws IOException {
        Control c;
        try {
            c = this.readOp();
        }
        catch (IOException e) {
            this.processOpError(e);
            return;
        }
        if (!c.op.equals(_INFO_OP_)) {
            throw new IOException("nats: Protocol Error, INFO not received");
        }
        this.processInfo(c.args);
        this.checkForSecure();
    }

    protected void processPing() {
        try {
            this.sendProto(this.pongProtoBytes, this.pongProtoBytesLen);
        }
        catch (IOException e) {
            this.setLastError(e);
        }
    }

    protected void processPong() {
        this.logger.trace("Processing PONG");
        Channel<Object> ch = new Channel(1);
        this.mu.lock();
        try {
            if (this.pongs.size() > 0) {
                ch = this.pongs.poll();
            }
            this.pout = 0;
            if (ch != null) {
                ch.add(true);
                this.logger.trace("Processed PONG");
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void processOK() {
    }

    protected void processInfo(String info) {
        if (info == null || info.isEmpty()) {
            return;
        }
        this.info = new ServerInfo(info);
    }

    private void processOpError(Exception e) {
        this.logger.trace("processOpError(e={}) state={} reconnectAllowed={} ", new Object[]{e.getClass().getName(), this.status, this.opts.isReconnectAllowed()});
        this.mu.lock();
        try {
            if (this.isConnecting() || this.isClosed() || this.isReconnecting()) {
                return;
            }
            if (this.opts.isReconnectAllowed() && this.status == Constants.ConnState.CONNECTED) {
                this.status = Constants.ConnState.RECONNECTING;
                if (this.ptmr != null) {
                    this.ptmr.shutdownNow();
                }
                if (this.conn != null) {
                    try {
                        this.bw.flush();
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                    this.conn.close();
                    this.conn = null;
                }
                this.logger.trace("\t\tspawning doReconnect() in state {}", (Object)this.status);
                this.go(new Runnable(){

                    @Override
                    public void run() {
                        Thread.currentThread().setName("reconnect");
                        ConnectionImpl.this.doReconnect();
                    }
                }, "reconnect", "phaser", this.phaser, true);
                this.logger.trace("\t\tspawned doReconnect() in state {}", (Object)this.status);
                return;
            }
            this.logger.trace("\t\tcalling processDisconnect() in state {}", (Object)this.status);
            this.processDisconnect();
            this.setLastError(e);
            this.close();
        }
        finally {
            this.logger.trace("processOpError() releasing lock");
            this.mu.unlock();
            this.logger.trace("processOpError() released lock");
        }
    }

    protected void processDisconnect() {
        this.logger.trace("processDisconnect()");
        this.status = Constants.ConnState.DISCONNECTED;
    }

    @Override
    public boolean isReconnecting() {
        this.mu.lock();
        try {
            boolean bl = this.status == Constants.ConnState.RECONNECTING;
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    private boolean isConnected() {
        return this.status == Constants.ConnState.CONNECTING || this.status == Constants.ConnState.CONNECTED;
    }

    @Override
    public boolean isClosed() {
        this.mu.lock();
        try {
            boolean bl = this.status == Constants.ConnState.CLOSED;
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void flushReconnectPendingItems() {
        this.logger.trace("flushReconnectPendingItems()");
        if (this.pending == null) {
            return;
        }
        if (this.pending.size() > 0) {
            try {
                this.logger.trace("flushReconnectPendingItems() writing {} bytes.", (Object)this.pending.size());
                this.bw.write(this.pending.toByteArray(), 0, this.pending.size());
                this.bw.flush();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        this.pending = null;
        this.logger.trace("flushReconnectPendingItems() DONE");
    }

    private void doReconnect() {
        this.logger.trace("doReconnect()");
        this.waitForExits();
        this.mu.lock();
        this.pending = new ByteArrayOutputStream(0x100000);
        this.bw = new BufferedOutputStream(this.pending);
        this.setLastError(null);
        if (this.opts.getDisconnectedCallback() != null) {
            this.mu.unlock();
            DisconnectedCallback dcb = this.opts.getDisconnectedCallback();
            this.logger.trace("Spawning disconnectCB from doReconnect()");
            dcb.onDisconnect(new ConnectionEvent(this));
            this.logger.trace("Spawned disconnectCB from doReconnect()");
            this.mu.lock();
        }
        while (this.srvPool.size() > 0) {
            Srv cur = null;
            try {
                cur = this.selectNextServer();
            }
            catch (IOException nse) {
                this.logger.trace("doReconnect() calling setLastError({})", (Object)nse.getMessage());
                this.setLastError(nse);
                break;
            }
            long elapsedMillis = cur.timeSinceLastAttempt();
            if (elapsedMillis < this.opts.getReconnectWait()) {
                long sleepTime = this.opts.getReconnectWait() - elapsedMillis;
                this.mu.unlock();
                this.sleepMsec((int)sleepTime);
                this.mu.lock();
            }
            if (this.isClosed()) {
                this.logger.trace("Connection has been closed while in doReconnect()");
                break;
            }
            ++cur.reconnects;
            this.logger.trace("doReconnect() incremented cur.reconnects: {}", (Object)cur);
            this.logger.trace("doReconnect: trying createConn() for {}", (Object)cur);
            try {
                this.createConn();
                this.logger.trace("doReconnect: createConn() successful for {}", (Object)cur);
            }
            catch (Exception e) {
                this.logger.trace("doReconnect: createConn() failed for {}", (Object)cur);
                this.logger.trace("createConn failed", (Throwable)e);
                this.setLastError(null);
                continue;
            }
            this.stats.incrementReconnects();
            this.logger.trace("Successful reconnect; Resetting reconnects for {}", (Object)cur);
            cur.reconnects = 0;
            try {
                this.processConnectInit();
            }
            catch (IOException e) {
                this.setLastError(e);
                this.status = Constants.ConnState.RECONNECTING;
                continue;
            }
            this.resendSubscriptions();
            this.flushReconnectPendingItems();
            try {
                this.bw.flush();
            }
            catch (IOException e) {
                this.setLastError(e);
                this.status = Constants.ConnState.RECONNECTING;
                continue;
            }
            this.setPending(null);
            this.status = Constants.ConnState.CONNECTED;
            ReconnectedCallback rcb = this.opts.getReconnectedCallback();
            this.mu.unlock();
            try {
                this.flush();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.logger.trace("doReconnect checking rcb != null");
            if (rcb != null) {
                this.logger.trace("doReconnect invoking rcb");
                rcb.onReconnect(new ConnectionEvent(this));
            }
            this.logger.trace("doReconnect finished successfully!");
            return;
        }
        this.logger.trace("Reconnect FAILED");
        if (this.getLastException() == null) {
            this.setLastError(new IOException("nats: No servers available for connection"));
        }
        this.conn.close();
        this.conn = null;
        this.mu.unlock();
        this.logger.trace("Calling   close() from doReconnect()");
        this.close();
        this.logger.trace("Completed close() from doReconnect()");
    }

    private boolean isConnecting() {
        this.mu.lock();
        try {
            boolean bl = this.status == Constants.ConnState.CONNECTING;
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processErr(ByteBuffer errorStream) {
        boolean doCBs = false;
        NATSException ex = null;
        String s = new String(errorStream.array(), 0, errorStream.position());
        this.logger.trace("processErr(errorStream={})", (Object)s);
        if (STALE_CONNECTION.equals(s)) {
            this.processOpError(new IOException("nats: Stale Connection"));
        } else {
            ex = new NATSException(s);
            this.mu.lock();
            try {
                this.setLastError(ex);
                if (this.status != Constants.ConnState.CONNECTING) {
                    doCBs = true;
                }
            }
            finally {
                this.mu.unlock();
            }
            this.close(Constants.ConnState.CLOSED, doCBs);
        }
    }

    protected void sendConnect() throws IOException {
        String line = null;
        this.logger.trace("sendConnect()");
        this.bw.write(this.connectProto().getBytes());
        this.logger.trace("=> {}", (Object)this.connectProto().trim());
        this.bw.write(this.pingProtoBytes, 0, this.pingProtoBytesLen);
        this.logger.trace("=> {}", (Object)new String(this.pingProtoBytes).trim());
        this.bw.flush();
        this.logger.trace("=> FLUSH");
        try {
            line = this.readLine();
        }
        catch (IOException e) {
            throw new IOException("nats: Connection Read Error", e);
        }
        if (this.opts.isVerbose() && line.equals(_OK_OP_)) {
            try {
                line = this.readLine();
            }
            catch (IOException e) {
                throw new IOException("nats: Connection Read Error", e);
            }
        }
        if (!PONG_PROTO.trim().equals(line)) {
            if (line.startsWith(_ERR_OP_)) {
                throw new IOException("nats: " + line.substring(_ERR_OP_.length()).trim());
            }
            String msg = line;
            if (line.startsWith("nats: ")) {
                msg = line.replace("nats: ", _EMPTY_);
            }
            throw new IOException("nats: " + msg);
        }
        this.status = Constants.ConnState.CONNECTED;
    }

    protected String readLine() throws IOException {
        String s = null;
        this.logger.trace("readLine() Reading from input stream");
        BufferedReader breader = this.conn.getBufferedInputStreamReader();
        s = breader.readLine();
        if (s == null) {
            throw new EOFException("nats: Connection Read Error");
        }
        this.logger.trace("<= {}", (Object)(s != null ? s.trim() : "null"));
        return s;
    }

    protected void sendProto(byte[] value, int length) throws IOException {
        this.logger.trace("in sendProto()");
        this.mu.lock();
        try {
            this.logger.trace("in sendProto(), writing");
            this.bw.write(value, 0, length);
            this.logger.trace("=> {}", (Object)new String(value).trim());
            this.kickFlusher();
        }
        finally {
            this.mu.unlock();
        }
    }

    private String connectProto() {
        String[] userpass;
        String u = this.url.getUserInfo();
        String user = null;
        String pass = null;
        if (u != null && (userpass = u.split(":"))[0].length() > 0) {
            user = userpass[0];
            switch (userpass.length) {
                case 2: {
                    pass = userpass[1];
                    break;
                }
            }
        }
        ConnectInfo info = new ConnectInfo(this.opts.isVerbose(), this.opts.isPedantic(), user, pass, this.opts.isSecure(), this.opts.getConnectionName());
        String result = String.format(CONN_PROTO, info.toJson());
        return result;
    }

    protected Control readOp() throws IOException {
        String s = this.readLine();
        Control c = new Control(s);
        this.logger.trace("readOp returning: " + c);
        return c;
    }

    private void waitForExits() {
        this.logger.trace("waitForExits()");
        this.setFlusherDone(true);
        this.kickFlusher();
        this.phaser.register();
        int nParties = this.phaser.getRegisteredParties();
        this.logger.trace("Num registered parties: {}", (Object)nParties);
        while (!this.phaser.isTerminated()) {
            this.phaser.arriveAndDeregister();
        }
        this.logger.trace("Done waiting: Num registered parties: {}", (Object)this.phaser.getRegisteredParties());
    }

    void runTasks(List<Runnable> tasks) {
        final Phaser phaser = new Phaser(1);
        for (final Runnable task : tasks) {
            phaser.register();
            new Thread(){

                @Override
                public void run() {
                    phaser.arriveAndAwaitAdvance();
                    task.run();
                    phaser.arriveAndDeregister();
                }
            }.start();
        }
        phaser.arriveAndDeregister();
    }

    protected void spinUpSocketWatchers() {
        this.logger.trace("Spinning up threads");
        if (this.phaser.getPhase() != 0) {
            this.waitForExits();
        }
        ArrayList<Runnable> tasks = new ArrayList<Runnable>();
        tasks.add(new Runnable(){

            @Override
            public void run() {
                ConnectionImpl.this.logger.trace("READLOOP STARTING");
                Thread.currentThread().setName("readloop");
                ConnectionImpl.this.readLoop();
                ConnectionImpl.this.logger.trace("READLOOP EXITING");
            }
        });
        tasks.add(new Runnable(){

            @Override
            public void run() {
                ConnectionImpl.this.logger.trace("FLUSHER STARTING");
                Thread.currentThread().setName("flusher");
                ConnectionImpl.this.flusher();
                ConnectionImpl.this.logger.trace("FLUSHER EXITING");
            }
        });
        this.runTasks(tasks);
        this.resetPingTimer();
    }

    protected Thread go(final Runnable task, final String name, final String group, final Phaser ph, final boolean dereg) {
        NATSThread.setDebug(true);
        NATSThread t = new NATSThread(task, name){

            @Override
            public void run() {
                if (ph != null) {
                    ph.register();
                    ConnectionImpl.this.logger.trace("{} registered in group {}. # registered for phase {} = {}", new Object[]{name, group, ph.getPhase(), ph.getRegisteredParties()});
                    ConnectionImpl.this.logger.trace(name + " starting");
                    ph.arriveAndAwaitAdvance();
                } else {
                    ConnectionImpl.this.logger.trace("Untracked thread " + name + " starting.");
                }
                task.run();
                if (ph != null) {
                    int oldPhase = ph.getPhase();
                    if (dereg) {
                        ConnectionImpl.this.logger.trace(name + " arrive and deregister for phase {}", (Object)ph.getPhase());
                        ConnectionImpl.this.logger.trace("{} (group {}) ending phase {}: Registered = {}, Arrived = {}, Unarrived={}", new Object[]{name, group, oldPhase, ph.getRegisteredParties(), ph.getArrivedParties(), ph.getUnarrivedParties()});
                        int phase = ph.arriveAndDeregister();
                        ConnectionImpl.this.logger.trace(name + " deregistered going into phase {}", (Object)phase);
                    } else {
                        ConnectionImpl.this.logger.trace("{} (group {}) ending phase {}: Registered = {}, Arrived = {}, Unarrived={}", new Object[]{name, group, ph.getPhase(), ph.getRegisteredParties(), ph.getArrivedParties(), ph.getUnarrivedParties()});
                        ConnectionImpl.this.logger.trace("phase before: {}", (Object)ph.getPhase());
                        int phase = ph.arrive();
                        ConnectionImpl.this.logger.trace(name + " advanced to phase {} after phase {} arrival", (Object)phase, (Object)oldPhase);
                        ConnectionImpl.this.logger.trace("{} (group {}) beginning phase {}: Registered = {}", new Object[]{name, group, ph.getPhase(), ph.getRegisteredParties(), ph.getArrivedParties(), ph.getUnarrivedParties()});
                    }
                } else {
                    ConnectionImpl.this.logger.trace("Untracked thread " + name + " completed.");
                }
            }
        };
        t.start();
        return t;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readLoop() {
        Parser parser = null;
        TCPConnection conn = null;
        this.mu.lock();
        try {
            if (this.ps == null) {
                this.ps = new Parser(this);
            }
            parser = this.ps;
        }
        finally {
            this.mu.unlock();
        }
        byte[] buffer = new byte[65536];
        while (true) {
            boolean sb;
            this.mu.lock();
            try {
                boolean bl = sb = this.isClosed() || this.isReconnecting();
                if (sb) {
                    this.ps = parser;
                }
                conn = this.conn;
            }
            finally {
                this.mu.unlock();
            }
            if (sb || conn == null) break;
            try {
                int len = this.br.read(buffer, 0, 65536);
                if (len == -1) {
                    throw new IOException("nats: Stale Connection");
                }
                parser.parse(buffer, len);
            }
            catch (IOException | ParseException e) {
                this.logger.trace("Exception in readLoop(): ConnState was {}", (Object)this.status, (Object)e);
                if (this.status == Constants.ConnState.CLOSED) break;
                this.processOpError(e);
                break;
            }
        }
        this.mu.lock();
        try {
            this.ps = null;
        }
        finally {
            this.mu.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void deliverMsgs(Channel<Message> ch) {
        Message m = null;
        this.mu.lock();
        try {
            if (this.status == Constants.ConnState.CLOSED) {
                return;
            }
        }
        finally {
            this.mu.unlock();
        }
        while ((m = ch.get()) != null) {
            if (m.sub.processMsg(m)) continue;
            this.mu.lock();
            try {
                this.removeSub(m.sub);
                continue;
            }
            finally {
                this.mu.unlock();
                continue;
            }
            break;
        }
        return;
    }

    protected void processMsgArgs(byte[] buffer, long length) throws ParseException {
        String s = new String(buffer, 0, (int)length);
        String[] args = s.split(_SPC_);
        switch (args.length) {
            case 3: {
                this.msgArgs.subject = args[0];
                this.msgArgs.sid = Long.parseLong(args[1]);
                this.msgArgs.reply = null;
                this.msgArgs.size = Integer.parseInt(args[2]);
                break;
            }
            case 4: {
                this.msgArgs.subject = args[0];
                this.msgArgs.sid = Long.parseLong(args[1]);
                this.msgArgs.reply = args[2];
                this.msgArgs.size = Integer.parseInt(args[3]);
                break;
            }
            default: {
                throw new ParseException("Unable to parse message arguments: " + s, 0);
            }
        }
        if (this.msgArgs.size < 0) {
            throw new ParseException("Invalid Message - Bad or Missing Size: " + s, 9);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processMsg(byte[] data, long length) {
        SubscriptionImpl s;
        boolean maxReached = false;
        this.mu.lock();
        try {
            this.stats.incrementInMsgs();
            this.stats.incrementInBytes(length);
            s = this.subs.get(this.msgArgs.sid);
            if (s == null) {
                return;
            }
            s.mu.lock();
            try {
                Message m;
                maxReached = s.tallyMessage(length);
                if (!maxReached && !s.addMessage(m = new Message(this.msgArgs, s, data, length))) {
                    this.processSlowConsumer(s);
                }
            }
            finally {
                s.mu.unlock();
            }
        }
        finally {
            this.mu.unlock();
        }
        if (maxReached) {
            this.removeSub(s);
        }
    }

    void removeSub(SubscriptionImpl sub) {
        this.subs.remove(sub.getSid());
        sub.getLock().lock();
        try {
            if (sub.getChannel() != null) {
                sub.mch.close();
                sub.mch = null;
            }
            sub.setConnection(null);
            sub.closed = true;
        }
        finally {
            sub.getLock().unlock();
        }
    }

    void processAsyncException(SubscriptionImpl s, Throwable e) {
        SubscriptionImpl sub = s;
        ConnectionImpl nc = this;
        final ExceptionHandler eh = this.opts.getExceptionHandler();
        final NATSException nex = new NATSException(e, nc, sub);
        if (this.opts.getExceptionHandler() != null && !s.isSlowConsumer()) {
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    eh.onException(nex);
                }
            });
        }
    }

    void processSlowConsumer(SubscriptionImpl s) {
        IOException ex = new IOException("nats: Slow Consumer, messages dropped");
        this.setLastError(ex);
        this.processAsyncException(s, ex);
        s.setSlowConsumer(true);
    }

    protected boolean removeFlushEntry(Channel<Boolean> chan) {
        Channel<Boolean> start;
        if (this.pongs.isEmpty()) {
            return false;
        }
        Channel<Boolean> c = start = this.pongs.poll();
        do {
            if (c == chan) {
                return true;
            }
            this.pongs.add(c);
        } while ((c = this.pongs.poll()) != start);
        return false;
    }

    protected void sendPing(Channel<Boolean> ch) {
        if (ch != null) {
            this.pongs.add(ch);
        }
        try {
            this.bw.write(this.pingProtoBytes, 0, this.pingProtoBytesLen);
            this.logger.trace("=> {}", (Object)new String(this.pingProtoBytes).trim());
            this.bw.flush();
        }
        catch (IOException e) {
            this.setLastError(e);
        }
    }

    protected void resetPingTimer() {
        this.mu.lock();
        try {
            Runnable pingRunnable = new Runnable(){

                @Override
                public void run() {
                    ConnectionImpl.this.processPingTimer();
                }
            };
            if (this.opts.getPingInterval() > 0L) {
                if (this.ptmr != null) {
                    this.ptmr.shutdownNow();
                }
                this.ptmr = Executors.newSingleThreadScheduledExecutor(new NATSThreadFactory("pinger"));
                this.ptmr.scheduleAtFixedRate(pingRunnable, this.opts.getPingInterval(), this.opts.getPingInterval(), TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void processPingTimer() {
        block6: {
            this.mu.lock();
            try {
                if (this.status != Constants.ConnState.CONNECTED) {
                    return;
                }
                ++this.pout;
                if (this.pout > this.opts.getMaxPingsOut()) {
                    this.processOpError(new IOException("nats: Stale Connection"));
                    break block6;
                }
                this.logger.trace("Sending PING after {} seconds.", (Object)TimeUnit.MILLISECONDS.toSeconds(this.opts.getPingInterval()));
                this.sendPing(null);
                return;
            }
            finally {
                this.mu.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unsubscribe(SubscriptionImpl sub, int max) throws IOException {
        this.mu.lock();
        try {
            if (this.isClosed()) {
                throw new IllegalStateException("nats: Connection Closed");
            }
            if (!this.subs.containsKey(sub.getSid())) {
                return;
            }
            if (max > 0) {
                sub.setMax(max);
            } else {
                this.removeSub(sub);
            }
            if (!this.isReconnecting()) {
                String str = String.format(UNSUB_PROTO, sub.getSid(), max > 0 ? Integer.toString(max) : _EMPTY_);
                str = str.replaceAll(" +\r\n", _CRLF_);
                byte[] unsub = str.getBytes();
                this.bw.write(unsub);
                this.logger.trace("=> {}", (Object)str.trim());
            }
        }
        finally {
            this.kickFlusher();
            this.mu.unlock();
        }
    }

    protected void kickFlusher() {
        if (this.bw != null && this.fch.getCount() == 0) {
            this.fch.add(true);
        }
    }

    private void setFlusherDone(boolean value) {
        this.flusherLock.lock();
        try {
            this.flusherDone = value;
            if (this.flusherDone) {
                this.kickFlusher();
            }
        }
        finally {
            this.flusherLock.unlock();
        }
    }

    private boolean isFlusherDone() {
        this.flusherLock.lock();
        try {
            boolean bl = this.flusherDone;
            return bl;
        }
        finally {
            this.flusherLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flusher() {
        BufferedOutputStream bw = null;
        TCPConnection conn = null;
        Channel<Boolean> fch = null;
        this.setFlusherDone(false);
        this.mu.lock();
        bw = this.bw;
        conn = this.conn;
        fch = this.fch;
        this.mu.unlock();
        if (conn == null || bw == null || !conn.isConnected()) {
            return;
        }
        while (!this.isFlusherDone()) {
            if (!fch.get().booleanValue()) {
                return;
            }
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.mu.lock();
            try {
                if (!this.isConnected() || this.isConnecting() || bw != this.bw || conn != this.conn) {
                    return;
                }
                bw.flush();
                this.stats.incrementFlushes();
            }
            catch (IOException iOException) {}
            continue;
            finally {
                this.mu.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush(int timeout) throws Exception {
        long t0 = System.nanoTime();
        long elapsed = 0L;
        boolean timedOut = false;
        Exception err = null;
        if (timeout <= 0) {
            throw new IllegalArgumentException("nats: Bad timeout value");
        }
        Channel<Boolean> ch = null;
        this.mu.lock();
        try {
            if (this.isClosed()) {
                throw new IllegalStateException("nats: Connection Closed");
            }
            ch = new Channel<Boolean>(1);
            this.logger.trace("flush({}): calling sendPing(ch)", (Object)timeout);
            this.sendPing(ch);
            this.logger.trace("flush({}): returned from sendPing(ch)", (Object)timeout);
        }
        finally {
            this.mu.unlock();
        }
        this.logger.trace("flush({}): awaiting PONG", (Object)timeout);
        Boolean rv = null;
        while (!timedOut && !this.isClosed() && rv == null) {
            elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
            timedOut = elapsed >= (long)timeout;
            rv = ch.poll();
        }
        this.logger.trace("elapsed = {}", (Object)elapsed);
        if (rv != null && !rv.booleanValue()) {
            err = new IllegalStateException("nats: Connection Closed");
        } else {
            this.logger.trace("flush({}): received PONG", (Object)timeout);
            this.mu.lock();
            err = this.getLastException();
            this.mu.unlock();
            ch.close();
        }
        if (err != null) {
            this.logger.trace("flush({}): calling removeFlushEntry(ch)", (Object)timeout, (Object)timeout);
            this.removeFlushEntry(ch);
            this.logger.trace("flush({}): after removeFlushEntry(ch), throwing", (Object)timeout, (Object)timeout);
            throw err;
        }
    }

    @Override
    public void flush() throws Exception {
        this.logger.trace("FLUSH");
        this.flush(60000);
        this.logger.trace("FLUSHED!");
    }

    private void resendSubscriptions() {
        for (Long key : this.subs.keySet()) {
            SubscriptionImpl s = this.subs.get(key);
            if (s instanceof AsyncSubscription) {
                ((AsyncSubscriptionImpl)s).enable();
            }
            this.logger.trace("Resending subscriptions:");
            this.sendSubscriptionMessage(s);
            if (s.getMaxPending() <= 0) continue;
            try {
                this.unsubscribe(s, s.getMaxPending());
            }
            catch (Exception exception) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Subscription subscribe(String subj, String queue, MessageHandler cb) {
        SubscriptionImpl sub = null;
        boolean async = cb != null;
        this.mu.lock();
        try {
            if (this.isClosed()) {
                throw new IllegalStateException("nats: Connection Closed");
            }
            sub = async ? new AsyncSubscriptionImpl(this, subj, queue, cb, this.opts.getMaxPendingMsgs()) : new SyncSubscriptionImpl(this, subj, queue, this.opts.getMaxPendingMsgs());
            this.addSubscription(sub);
            if (!this.isReconnecting()) {
                if (async) {
                    ((AsyncSubscriptionImpl)sub).start();
                } else {
                    this.sendSubscriptionMessage(sub);
                }
            }
            this.kickFlusher();
        }
        finally {
            this.mu.unlock();
        }
        return sub;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AsyncSubscription subscribeAsync(String subject, String queue, MessageHandler cb) {
        AsyncSubscriptionImpl s = null;
        this.mu.lock();
        try {
            if (this.isClosed()) {
                throw new IllegalStateException("nats: Connection Closed");
            }
            s = new AsyncSubscriptionImpl(this, subject, queue, null, this.opts.getMaxPendingMsgs());
            this.addSubscription(s);
            if (cb != null) {
                s.setMessageHandler(cb);
                s.start();
            }
        }
        finally {
            this.mu.unlock();
        }
        return s;
    }

    @Override
    public AsyncSubscription subscribeAsync(String subj, String queue) {
        return this.subscribeAsync(subj, queue, null);
    }

    @Override
    public AsyncSubscription subscribeAsync(String subj, MessageHandler cb) {
        return this.subscribeAsync(subj, null, cb);
    }

    @Override
    public AsyncSubscription subscribeAsync(String subj) {
        return this.subscribeAsync(subj, null, null);
    }

    private void addSubscription(SubscriptionImpl s) {
        s.setSid(this.sidCounter.incrementAndGet());
        this.subs.put(s.getSid(), s);
        this.logger.trace("Successfully added subscription to {} [{}]", (Object)s.getSubject(), (Object)s.getSid());
    }

    @Override
    public AsyncSubscription subscribe(String subject, MessageHandler cb) {
        return (AsyncSubscription)this.subscribe(subject, _EMPTY_, cb);
    }

    @Override
    public SyncSubscription subscribeSync(String subject, String queue) {
        return (SyncSubscription)this.subscribe(subject, queue, null);
    }

    @Override
    public SyncSubscription subscribeSync(String subject) {
        return (SyncSubscription)this.subscribe(subject, _EMPTY_, null);
    }

    private int writePublishProto(byte[] dst, String subject, String reply, int msgSize) {
        int index = this.pubPrimBytesLen;
        index = Utilities.stringToBytesASCII(dst, index, subject);
        if (reply != null) {
            dst[index] = 32;
            ++index;
            index = Utilities.stringToBytesASCII(dst, index, reply);
        }
        dst[index] = 32;
        ++index;
        index = Utilities.stringToBytesASCII(dst, index, String.valueOf(msgSize));
        System.arraycopy(this.crlfProtoBytes, 0, dst, index, this.crlfProtoBytesLen);
        return index += this.crlfProtoBytesLen;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void publish(String subject, String reply, byte[] data) {
        if (subject == null) {
            throw new NullPointerException("nats: Invalid Subject");
        }
        if (subject.isEmpty()) {
            throw new IllegalArgumentException("nats: Invalid Subject");
        }
        int msgSize = data != null ? data.length : 0;
        this.mu.lock();
        try {
            int pubProtoLen;
            if ((long)msgSize > this.info.getMaxPayload()) {
                throw new IllegalArgumentException("nats: Maximum Payload Exceeded");
            }
            if (this.status == Constants.ConnState.CLOSED) {
                throw new IllegalStateException("nats: Connection Closed");
            }
            try {
                pubProtoLen = this.writePublishProto(this.pubProtoBuf, subject, reply, msgSize);
            }
            catch (IndexOutOfBoundsException e) {
                int resizeAmount = 1024 + subject.length() + (reply != null ? reply.length() : 0);
                this.buildPublishProtocolBuffer(resizeAmount);
                pubProtoLen = this.writePublishProto(this.pubProtoBuf, subject, reply, msgSize);
            }
            try {
                this.bw.write(this.pubProtoBuf, 0, pubProtoLen);
                if (msgSize > 0) {
                    this.bw.write(data, 0, msgSize);
                }
                this.bw.write(this.crlfProtoBytes, 0, this.crlfProtoBytesLen);
            }
            catch (IOException e) {
                this.setLastError(e);
                this.mu.unlock();
                return;
            }
            this.kickFlusher();
            this.stats.incrementOutMsgs();
            this.stats.incrementOutBytes(msgSize);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public void publish(String subject, byte[] data) {
        this.publish(subject, null, data);
    }

    @Override
    public void publish(Message msg) {
        this.publish(msg.getSubject(), msg.getReplyTo(), msg.getData());
    }

    private Message _request(String subject, byte[] data, long timeout, TimeUnit unit) throws TimeoutException, IOException {
        String inbox = this.newInbox();
        Message m = null;
        try (SyncSubscription s = this.subscribeSync(inbox, null);){
            s.autoUnsubscribe(1);
            this.publish(subject, inbox, data);
            m = s.nextMessage(timeout, unit);
        }
        return m;
    }

    @Override
    public Message request(String subject, byte[] data, long timeout) throws TimeoutException, IOException {
        return this.request(subject, data, timeout, TimeUnit.MILLISECONDS);
    }

    @Override
    public Message request(String subject, byte[] data, long timeout, TimeUnit unit) throws TimeoutException, IOException {
        if (timeout <= 0L) {
            throw new IllegalArgumentException("Timeout must be greater that 0.");
        }
        return this._request(subject, data, timeout, unit);
    }

    @Override
    public Message request(String subject, byte[] data) throws IOException, TimeoutException {
        return this._request(subject, data, -1L, TimeUnit.MILLISECONDS);
    }

    @Override
    public String newInbox() {
        if (this.r == null) {
            this.r = new Random();
        }
        byte[] buf = new byte[13];
        this.r.nextBytes(buf);
        String s1 = inboxPrefix;
        String s2 = Utilities.bytesToHex(buf);
        return s1.concat(s2);
    }

    @Override
    public synchronized Statistics getStats() {
        return new Statistics(this.stats);
    }

    @Override
    public synchronized void resetStats() {
        this.stats.clear();
    }

    @Override
    public synchronized long getMaxPayload() {
        return this.info.getMaxPayload();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendSubscriptionMessage(SubscriptionImpl sub) {
        this.mu.lock();
        try {
            if (!this.isReconnecting()) {
                String s = String.format(SUB_PROTO, sub.getSubject(), sub.getQueue(), sub.getSid());
                try {
                    this.bw.write(Utilities.stringToBytesASCII(s));
                    this.kickFlusher();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public ClosedCallback getClosedCallback() {
        return this.opts.getClosedCallback();
    }

    @Override
    public void setClosedCallback(ClosedCallback cb) {
        this.opts.setClosedCallback(cb);
    }

    @Override
    public DisconnectedCallback getDisconnectedCallback() {
        return this.opts.getDisconnectedCallback();
    }

    @Override
    public void setDisconnectedCallback(DisconnectedCallback cb) {
        this.opts.setDisconnectedCallback(cb);
    }

    @Override
    public ReconnectedCallback getReconnectedCallback() {
        return this.opts.getReconnectedCallback();
    }

    @Override
    public void setReconnectedCallback(ReconnectedCallback cb) {
        this.opts.setReconnectedCallback(cb);
    }

    @Override
    public ExceptionHandler getExceptionHandler() {
        return this.opts.getExceptionHandler();
    }

    @Override
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.opts.setExceptionHandler(exceptionHandler);
    }

    @Override
    public String getConnectedUrl() {
        this.mu.lock();
        try {
            if (this.status != Constants.ConnState.CONNECTED) {
                String string = null;
                return string;
            }
            String string = this.url.toString();
            return string;
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public String getConnectedServerId() {
        this.mu.lock();
        try {
            if (this.status != Constants.ConnState.CONNECTED) {
                String string = null;
                return string;
            }
            String string = this.info.getId();
            return string;
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public Constants.ConnState getState() {
        this.mu.lock();
        try {
            Constants.ConnState connState = this.status;
            return connState;
        }
        finally {
            this.mu.unlock();
        }
    }

    public void finalize() {
        try {
            this.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public ServerInfo getConnectedServerInfo() {
        return this.info;
    }

    void setConnectedServerInfo(ServerInfo info) {
        this.info = info;
    }

    void setConnectedServerInfo(String info) {
        this.processInfo(info);
    }

    @Override
    public Exception getLastException() {
        return this.lastEx;
    }

    private void setLastError(Exception e) {
        this.lastEx = e;
    }

    protected Options getOptions() {
        return this.opts;
    }

    void setPending(ByteArrayOutputStream pending) {
        this.pending = pending;
    }

    ByteArrayOutputStream getPending() {
        return this.pending;
    }

    protected void sleepMsec(long msec) {
        try {
            this.logger.trace("Sleeping for {} ms", (Object)msec);
            Thread.sleep(msec);
            this.logger.trace("Slept    for {} ms", (Object)msec);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    void setOutputStream(BufferedOutputStream out) {
        this.mu.lock();
        try {
            this.bw = out;
        }
        finally {
            this.mu.unlock();
        }
    }

    protected Queue<Channel<Boolean>> getPongs() {
        return this.pongs;
    }

    protected void setPongs(Queue<Channel<Boolean>> pongs) {
        this.pongs = pongs;
    }

    protected Map<Long, SubscriptionImpl> getSubs() {
        return this.subs;
    }

    protected void setSubs(Map<Long, SubscriptionImpl> subs) {
        this.subs = subs;
    }

    protected List<Srv> getServerPool() {
        return this.srvPool;
    }

    protected void setServerPool(List<Srv> pool) {
        this.srvPool = pool;
    }

    class Srv {
        URI url = null;
        int reconnects = 0;
        long lastAttempt = 0L;
        long lastAttemptNanos = 0L;

        protected Srv(URI url) {
            this.url = url;
        }

        void updateLastAttempt() {
            this.lastAttemptNanos = System.nanoTime();
            this.lastAttempt = System.currentTimeMillis();
        }

        long timeSinceLastAttempt() {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.lastAttemptNanos);
        }

        public String toString() {
            SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy hh:mm:ss");
            String DateToStr = format.format(new Date(this.lastAttempt));
            return String.format("{url=%s, reconnects=%d, lastAttempt=%s, timeSinceLastAttempt=%dms}", this.url.toString(), this.reconnects, DateToStr, this.timeSinceLastAttempt());
        }
    }

    class ConnectInfo {
        private Boolean verbose;
        private Boolean pedantic;
        private String user;
        private String pass;
        private Boolean ssl;
        private String name;
        private String lang = "java";
        private String version;

        public ConnectInfo(boolean verbose, boolean pedantic, String username, String password, boolean secure, String connectionName) {
            this.version = ConnectionImpl.this.version;
            this.verbose = new Boolean(verbose);
            this.pedantic = new Boolean(pedantic);
            this.user = username;
            this.pass = password;
            this.ssl = new Boolean(secure);
            this.name = connectionName;
        }

        public String toJson() {
            StringBuilder sb = new StringBuilder(1024);
            sb.append(String.format("{\"verbose\":%s,\"pedantic\":%s,", this.verbose.toString(), this.pedantic.toString()));
            if (this.user != null) {
                sb.append(String.format("\"user\":\"%s\",", this.user));
                if (this.pass != null) {
                    sb.append(String.format("\"pass\":\"%s\",", this.pass));
                }
            }
            sb.append(String.format("\"ssl_required\":%s,\"name\":\"%s\",\"lang\":\"%s\",\"version\":\"%s\"}", this.ssl.toString(), this.name != null ? this.name : ConnectionImpl._EMPTY_, this.lang, this.version));
            return sb.toString();
        }
    }

    protected class Control {
        String op = null;
        String args = null;

        protected Control(String s) {
            if (s == null) {
                return;
            }
            String[] parts = s.split(ConnectionImpl._SPC_, 2);
            switch (parts.length) {
                case 1: {
                    this.op = parts[0].trim();
                    break;
                }
                case 2: {
                    this.op = parts[0].trim();
                    this.args = parts[1].trim();
                    if (!this.args.isEmpty()) break;
                    this.args = null;
                    break;
                }
            }
        }

        public String toString() {
            return "{op=" + this.op + ", args=" + this.args + "}";
        }
    }
}

