/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client;

import io.nats.client.AsyncSubscription;
import io.nats.client.AsyncSubscriptionImpl;
import io.nats.client.Channel;
import io.nats.client.ClosedCallback;
import io.nats.client.Connection;
import io.nats.client.ConnectionEvent;
import io.nats.client.Constants;
import io.nats.client.DisconnectedCallback;
import io.nats.client.ExceptionHandler;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.NATSException;
import io.nats.client.NATSThread;
import io.nats.client.NATSThreadFactory;
import io.nats.client.NUID;
import io.nats.client.Options;
import io.nats.client.Parser;
import io.nats.client.ReconnectedCallback;
import io.nats.client.ServerInfo;
import io.nats.client.Statistics;
import io.nats.client.Subscription;
import io.nats.client.SubscriptionImpl;
import io.nats.client.SyncSubscription;
import io.nats.client.SyncSubscriptionImpl;
import io.nats.client.TCPConnection;
import io.nats.client.Utilities;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.net.URI;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConnectionImpl
implements Connection {
    final Logger logger = LoggerFactory.getLogger(ConnectionImpl.class);
    String version = null;
    private static final String inboxPrefix = "_INBOX.";
    public Constants.ConnState status = Constants.ConnState.DISCONNECTED;
    protected static final String STALE_CONNECTION = "Stale Connection";
    protected static final String THREAD_POOL = "natsthreadpool";
    protected static final String LANG_STRING = "java";
    protected static final int DEFAULT_BUF_SIZE = 65536;
    protected static final int DEFAULT_STREAM_BUF_SIZE = 8192;
    protected static final int DEFAULT_PENDING_SIZE = 0x100000;
    protected static final int FLUSH_CHAN_SIZE = 1024;
    public static final String _CRLF_ = "\r\n";
    public static final String _EMPTY_ = "";
    public static final String _SPC_ = " ";
    public static final String _PUB_P_ = "PUB ";
    public static final String _OK_OP_ = "+OK";
    public static final String _ERR_OP_ = "-ERR";
    public static final String _MSG_OP_ = "MSG";
    public static final String _PING_OP_ = "PING";
    public static final String _PONG_OP_ = "PONG";
    public static final String _INFO_OP_ = "INFO";
    public static final String CONN_PROTO = "CONNECT %s\r\n";
    public static final String PING_PROTO = "PING\r\n";
    public static final String PONG_PROTO = "PONG\r\n";
    public static final String PUB_PROTO = "PUB %s %s %d\r\n";
    public static final String SUB_PROTO = "SUB %s%s %d\r\n";
    public static final String UNSUB_PROTO = "UNSUB %d %s\r\n";
    public static final String OK_PROTO = "+OK\r\n";
    private ConnectionImpl nc = null;
    protected final Lock mu = new ReentrantLock();
    private AtomicLong sidCounter = new AtomicLong();
    private URI url = null;
    protected Options opts = null;
    private TCPConnection conn = null;
    ByteBuffer pubProtoBuf = null;
    private OutputStream bw = null;
    private BufferedInputStream br = null;
    private ByteArrayOutputStream pending = null;
    private ReentrantLock flusherLock = new ReentrantLock();
    private boolean flusherDone = false;
    protected Map<Long, SubscriptionImpl> subs = new ConcurrentHashMap<Long, SubscriptionImpl>();
    protected List<Srv> srvPool = null;
    private Exception lastEx = null;
    private ServerInfo info = null;
    private int pout;
    protected Parser parser = new Parser(this);
    protected Parser.ParseState ps = new Parser.ParseState(this.parser);
    protected byte[] pingProtoBytes = null;
    protected int pingProtoBytesLen = 0;
    protected byte[] pongProtoBytes = null;
    protected int pongProtoBytesLen = 0;
    protected byte[] pubPrimBytes = null;
    protected int pubPrimBytesLen = 0;
    protected byte[] crlfProtoBytes = null;
    protected int crlfProtoBytesLen = 0;
    protected Statistics stats = null;
    private ArrayList<Channel<Boolean>> pongs = null;
    private ExecutorService cbexec = Executors.newSingleThreadExecutor(new NATSThreadFactory("natsthreadpool"));
    private ScheduledExecutorService ptmr = null;
    private Phaser phaser = new Phaser();
    private Channel<Boolean> fch = new Channel(1024);
    private List<Thread> threads = new ArrayList<Thread>();
    static final byte[] digits = new byte[]{48, 49, 50, 51, 52, 53, 54, 55, 56, 57};

    ConnectionImpl() {
    }

    ConnectionImpl(Options opts) {
        this(opts, null);
    }

    ConnectionImpl(Options opts, TCPConnection tcpconn) {
        Properties props = this.getProperties("jnats.properties");
        this.version = props.getProperty("client.version");
        this.nc = this;
        this.opts = opts;
        this.stats = new Statistics();
        this.conn = tcpconn != null ? tcpconn : new TCPConnection();
        this.sidCounter.set(0L);
        this.pingProtoBytes = PING_PROTO.getBytes();
        this.pingProtoBytesLen = this.pingProtoBytes.length;
        this.pongProtoBytes = PONG_PROTO.getBytes();
        this.pongProtoBytesLen = this.pongProtoBytes.length;
        this.pubPrimBytes = _PUB_P_.getBytes();
        this.pubPrimBytesLen = this.pubPrimBytes.length;
        this.crlfProtoBytes = _CRLF_.getBytes();
        this.crlfProtoBytesLen = this.crlfProtoBytes.length;
        this.buildPublishProtocolBuffer(1024);
        this.setupServerPool();
    }

    private void setup() {
        this.subs.clear();
        this.pongs = new ArrayList();
    }

    protected Properties getProperties(InputStream inputStream) {
        Properties rv = new Properties();
        try {
            if (inputStream == null) {
                rv = null;
            } else {
                rv.load(inputStream);
            }
        }
        catch (IOException e) {
            rv = null;
        }
        return rv;
    }

    protected Properties getProperties(String resourceName) {
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(resourceName);
        return this.getProperties(is);
    }

    private void buildPublishProtocolBuffer(int size) {
        this.pubProtoBuf = ByteBuffer.allocate(size);
        this.pubProtoBuf.put(this.pubPrimBytes, 0, this.pubPrimBytesLen);
        this.pubProtoBuf.mark();
    }

    protected void setupServerPool() {
        URI url = this.opts.getUrl();
        List<URI> servers = this.opts.getServers();
        this.srvPool = new ArrayList<Srv>();
        if (url != null) {
            this.srvPool.add(new Srv(url));
        }
        if (servers != null) {
            if (!this.opts.isNoRandomize()) {
                Collections.shuffle(servers, new Random(System.nanoTime()));
            }
            for (URI s : servers) {
                this.srvPool.add(new Srv(s));
            }
        }
        if (this.srvPool.isEmpty()) {
            this.srvPool.add(new Srv(URI.create("nats://localhost:4222")));
        }
        this.url = this.srvPool.get((int)0).url;
    }

    protected Srv currentServer() {
        Srv rv = null;
        for (Srv s : this.srvPool) {
            if (!s.url.equals(this.url)) continue;
            rv = s;
            break;
        }
        return rv;
    }

    protected Srv selectNextServer() throws IOException {
        this.logger.trace("In selectNextServer()");
        Srv srv = this.currentServer();
        if (srv == null) {
            throw new IOException("nats: no servers available for connection");
        }
        this.logger.trace("selectNextServer, removing {}", (Object)srv);
        this.srvPool.remove(srv);
        int maxReconnect = this.opts.getMaxReconnect();
        if (maxReconnect < 0 || srv.reconnects < maxReconnect) {
            this.logger.trace("selectNextServer: adding {}, maxReconnect: {}", (Object)srv, (Object)maxReconnect);
            this.srvPool.add(srv);
        }
        if (this.srvPool.isEmpty()) {
            this.url = null;
            throw new IOException("nats: no servers available for connection");
        }
        return this.srvPool.get(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connect() throws IOException, TimeoutException {
        IOException returnedErr = null;
        this.mu.lock();
        try {
            for (Srv s : this.srvPool) {
                this.url = s.url;
                try {
                    this.logger.debug("Connecting to {}", (Object)this.url);
                    this.createConn();
                    this.logger.debug("Connected to {}", (Object)this.url);
                    this.setup();
                    try {
                        this.processConnectInit();
                        this.logger.trace("connect() Resetting reconnects for {}", (Object)s);
                        s.reconnects = 0;
                        returnedErr = null;
                        break;
                    }
                    catch (IOException e) {
                        returnedErr = e;
                        this.logger.trace("{} Exception: {}", (Object)this.url, (Object)e.getMessage());
                        this.mu.unlock();
                        this.close(Constants.ConnState.DISCONNECTED, false);
                        this.mu.lock();
                        this.url = null;
                    }
                }
                catch (IOException e) {
                    if (!(e instanceof SocketException) || e.getMessage() == null || !e.getMessage().contains("Connection refused")) continue;
                    this.setLastError(null);
                }
            }
            if (returnedErr == null && this.status != Constants.ConnState.CONNECTED) {
                returnedErr = new IOException("nats: no servers available for connection");
            }
            if (returnedErr != null) {
                if (returnedErr instanceof IOException) {
                    throw (IOException)returnedErr;
                }
                if (returnedErr instanceof TimeoutException) {
                    throw (TimeoutException)((Object)returnedErr);
                }
                throw new Error("Unexpected error", returnedErr);
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void createConn() throws IOException {
        Srv srv = this.currentServer();
        if (srv == null) {
            throw new IOException("nats: no servers available for connection");
        }
        srv.updateLastAttempt();
        this.logger.trace("createConn(): {}", (Object)srv.url);
        try {
            this.logger.trace("Opening {}", (Object)srv.url);
            if (this.conn == null) {
                this.conn = new TCPConnection();
            }
            this.conn.open(srv.url.getHost(), srv.url.getPort(), this.opts.getConnectionTimeout());
            this.logger.trace("Opened {}", (Object)srv.url);
        }
        catch (IOException e) {
            this.logger.trace("Couldn't establish connection to {}: {}", (Object)srv.url, (Object)e.getMessage());
            throw e;
        }
        if (this.pending != null && this.bw != null) {
            this.logger.trace("Flushing old outputstream to pending");
            try {
                this.bw.flush();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.bw = this.conn.getBufferedOutputStream(8192);
        this.br = this.conn.getBufferedInputStream(8192);
    }

    private void clearPendingFlushCalls() {
        this.logger.trace("clearPendingFlushCalls()");
        for (Channel<Boolean> ch : this.pongs) {
            if (ch == null) continue;
            ch.close();
            this.logger.trace("Cleared PONG");
        }
        this.pongs.clear();
    }

    @Override
    public void close() {
        this.close(Constants.ConnState.CLOSED, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(Constants.ConnState closeState, boolean doCBs) {
        this.logger.trace("close({}, {})", (Object)closeState, (Object)String.valueOf(doCBs));
        final ConnectionImpl nc = this;
        this.mu.lock();
        if (this._isClosed()) {
            this.status = closeState;
            this.mu.unlock();
            return;
        }
        this.status = Constants.ConnState.CLOSED;
        this.kickFlusher();
        this.mu.unlock();
        this.mu.lock();
        try {
            this.clearPendingFlushCalls();
            if (this.ptmr != null) {
                this.ptmr.shutdownNow();
            }
            if (this.conn != null) {
                try {
                    this.bw.flush();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
            this.logger.trace("Closing subscriptions");
            for (Long key : this.subs.keySet()) {
                SubscriptionImpl sub = this.subs.get(key);
                sub.mu.lock();
                sub.closeChannel();
                sub.closed = true;
                sub.connClosed = true;
                sub.mu.unlock();
            }
            this.subs.clear();
            if (doCBs) {
                if (this.opts.getDisconnectedCallback() != null && this.conn != null) {
                    this.cbexec.execute(new Runnable(){

                        @Override
                        public void run() {
                            ConnectionImpl.this.opts.getDisconnectedCallback().onDisconnect(new ConnectionEvent(nc));
                            ConnectionImpl.this.logger.trace("executed DisconnectedCB");
                        }
                    });
                }
                if (this.opts.getClosedCallback() != null) {
                    this.cbexec.execute(new Runnable(){

                        @Override
                        public void run() {
                            ConnectionImpl.this.opts.getClosedCallback().onClose(new ConnectionEvent(nc));
                            ConnectionImpl.this.logger.trace("executed ClosedCB");
                        }
                    });
                }
            }
            this.status = closeState;
        }
        finally {
            if (this.conn != null) {
                this.conn.close();
            }
            this.mu.unlock();
            if (doCBs) {
                this.cbexec.shutdown();
                try {
                    while (!this.cbexec.awaitTermination(5L, TimeUnit.SECONDS)) {
                        this.logger.debug("Awaiting completion of threads.");
                    }
                }
                catch (InterruptedException e) {
                    this.logger.debug("Interrupted waiting to shutdown cbexec", (Throwable)e);
                }
            }
            this.logger.trace("close(state, doCBs): released lock and returning");
        }
    }

    protected void processConnectInit() throws IOException {
        this.logger.trace("processConnectInit(): {}", (Object)this.url);
        this.status = Constants.ConnState.CONNECTING;
        this.processExpectedInfo();
        this.sendConnect();
        this.pout = 0;
        this.spinUpSocketWatchers();
    }

    private void checkForSecure() throws IOException {
        if (this.opts.isSecure() && !this.info.isTlsRequired()) {
            throw new IOException("nats: secure connection not available");
        }
        if (this.info.isTlsRequired() && !this.opts.isSecure()) {
            throw new IOException("nats: secure connection required");
        }
        if (this.opts.isSecure() || "tls".equals(this.url.getScheme())) {
            this.makeTLSConn();
        }
    }

    private void makeTLSConn() throws IOException {
        this.conn.setTlsDebug(this.opts.isTlsDebug());
        this.conn.makeTLS(this.opts.getSSLContext());
        this.bw = this.conn.getBufferedOutputStream(65536);
        this.br = this.conn.getBufferedInputStream(65536);
    }

    protected void processExpectedInfo() throws IOException {
        Control control;
        try {
            control = this.readOp();
        }
        catch (IOException e) {
            this.processOpError(e);
            return;
        }
        if (!control.op.equals(_INFO_OP_)) {
            throw new IOException("nats: protocol error, INFO not received");
        }
        this.processInfo(control.args);
        this.checkForSecure();
    }

    protected void processPing() {
        try {
            this.sendProto(this.pongProtoBytes, this.pongProtoBytesLen);
        }
        catch (IOException e) {
            this.setLastError(e);
        }
    }

    protected void processPong() {
        this.logger.trace("Processing PONG");
        Channel<Object> ch = new Channel(1);
        this.mu.lock();
        try {
            if (this.pongs.size() > 0) {
                ch = this.pongs.get(0);
                this.pongs.remove(0);
            }
            this.pout = 0;
        }
        finally {
            this.mu.unlock();
        }
        if (ch != null) {
            ch.add(true);
        }
        this.logger.trace("Processed PONG");
    }

    protected void processOK() {
    }

    protected void processInfo(String info) {
        if (info == null || info.isEmpty()) {
            return;
        }
        this.info = new ServerInfo(info);
    }

    private void processOpError(Exception err) {
        this.logger.trace("processOpError(e={}) state={} reconnectAllowed={} ", new Object[]{err.getClass().getName(), this.status, this.opts.isReconnectAllowed()});
        this.mu.lock();
        try {
            if (this.isConnecting() || this._isClosed() || this._isReconnecting()) {
                return;
            }
            if (this.opts.isReconnectAllowed() && this.status == Constants.ConnState.CONNECTED) {
                this.status = Constants.ConnState.RECONNECTING;
                if (this.ptmr != null) {
                    this.ptmr.shutdownNow();
                }
                if (this.conn != null) {
                    try {
                        this.bw.flush();
                    }
                    catch (IOException e1) {
                        this.logger.error("I/O error during flush", (Throwable)e1);
                    }
                    this.conn.close();
                }
                this.logger.trace("processOpError: redirecting output to pending buffer");
                this.setPending(new ByteArrayOutputStream(this.opts.getReconnectBufSize()));
                this.setOutputStream(this.getPending());
                this.logger.trace("\t\tspawning doReconnect() in state {}", (Object)this.status);
                this.threads.add(this.go(new Runnable(){

                    @Override
                    public void run() {
                        Thread.currentThread().setName("reconnect");
                        ConnectionImpl.this.doReconnect();
                    }
                }, "reconnect", "phaser", this.phaser));
                this.logger.trace("\t\tspawned doReconnect() in state {}", (Object)this.status);
                return;
            }
            this.logger.trace("\t\tcalling processDisconnect() in state {}", (Object)this.status);
            this.processDisconnect();
            this.setLastError(err);
            this.close();
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void processDisconnect() {
        this.logger.trace("processDisconnect()");
        this.status = Constants.ConnState.DISCONNECTED;
    }

    @Override
    public boolean isReconnecting() {
        this.mu.lock();
        try {
            boolean bl = this._isReconnecting();
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    private boolean _isReconnecting() {
        return this.status == Constants.ConnState.RECONNECTING;
    }

    private boolean isConnected() {
        return this.status == Constants.ConnState.CONNECTING || this.status == Constants.ConnState.CONNECTED;
    }

    @Override
    public boolean isClosed() {
        this.mu.lock();
        try {
            boolean bl = this._isClosed();
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    private boolean _isClosed() {
        return this.status == Constants.ConnState.CLOSED;
    }

    protected void flushReconnectPendingItems() {
        this.logger.trace("flushReconnectPendingItems()");
        if (this.pending == null) {
            return;
        }
        if (this.pending.size() > 0) {
            try {
                this.logger.trace("flushReconnectPendingItems() writing {} bytes.", (Object)this.pending.size());
                this.bw.write(this.pending.toByteArray(), 0, this.pending.size());
                this.bw.flush();
            }
            catch (IOException e) {
                this.logger.error("Error flushing pending items", (Throwable)e);
            }
        }
        this.pending = null;
        this.logger.trace("flushReconnectPendingItems() DONE");
    }

    private void doReconnect() {
        this.logger.trace("doReconnect()");
        this.waitForExits();
        this.mu.lock();
        this.nc.clearPendingFlushCalls();
        this.setLastError(null);
        if (this.opts.getDisconnectedCallback() != null) {
            this.logger.trace("Spawning disconnectCB from doReconnect()");
            this.cbexec.execute(new Runnable(){

                @Override
                public void run() {
                    ConnectionImpl.this.opts.getDisconnectedCallback().onDisconnect(new ConnectionEvent(ConnectionImpl.this.nc));
                }
            });
            this.logger.trace("Spawned disconnectCB from doReconnect()");
        }
        while (!this.srvPool.isEmpty()) {
            Srv cur = null;
            try {
                cur = this.selectNextServer();
                this.url = cur.url;
            }
            catch (IOException nse) {
                this.logger.trace("doReconnect() calling setLastError({})", (Object)nse.getMessage());
                this.setLastError(nse);
                break;
            }
            long elapsedMillis = cur.timeSinceLastAttempt();
            if (elapsedMillis < this.opts.getReconnectWait()) {
                long sleepTime = this.opts.getReconnectWait() - elapsedMillis;
                this.mu.unlock();
                this.sleepMsec((int)sleepTime);
                this.mu.lock();
            }
            if (this.isClosed()) {
                this.logger.trace("Connection has been closed while in doReconnect()");
                break;
            }
            ++cur.reconnects;
            this.logger.trace("doReconnect() incremented cur.reconnects: {}", (Object)cur);
            this.logger.trace("doReconnect: trying createConn() for {}", (Object)cur);
            try {
                this.createConn();
                this.logger.trace("doReconnect: createConn() successful for {}", (Object)cur);
            }
            catch (Exception e) {
                this.conn.teardown();
                this.logger.trace("doReconnect: createConn() failed for {}", (Object)cur);
                this.logger.trace("createConn failed", (Throwable)e);
                this.setLastError(null);
                continue;
            }
            this.stats.incrementReconnects();
            try {
                this.processConnectInit();
            }
            catch (IOException e) {
                this.conn.teardown();
                this.logger.trace("doReconnect: processConnectInit FAILED for {}", (Object)cur, (Object)e);
                this.setLastError(e);
                this.status = Constants.ConnState.RECONNECTING;
                continue;
            }
            this.logger.trace("Successful reconnect; Resetting reconnects for {}", (Object)cur);
            cur.reconnects = 0;
            this.resendSubscriptions();
            this.flushReconnectPendingItems();
            try {
                this.bw.flush();
            }
            catch (IOException e) {
                this.setLastError(e);
                this.status = Constants.ConnState.RECONNECTING;
                continue;
            }
            this.setPending(null);
            this.status = Constants.ConnState.CONNECTED;
            if (this.opts.getReconnectedCallback() != null) {
                this.logger.trace("Spawning reconnectedCB from doReconnect()");
                this.cbexec.execute(new Runnable(){

                    @Override
                    public void run() {
                        ConnectionImpl.this.opts.getReconnectedCallback().onReconnect(new ConnectionEvent(ConnectionImpl.this.nc));
                    }
                });
                this.logger.trace("Spawned reconnectedCB from doReconnect()");
            }
            this.mu.unlock();
            try {
                this.flush();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.logger.trace("doReconnect reconnected successfully!");
            return;
        }
        this.logger.trace("Reconnect FAILED");
        if (this.getLastException() == null) {
            this.setLastError(new IOException("nats: no servers available for connection"));
        }
        this.mu.unlock();
        this.logger.trace("Calling   close() from doReconnect()");
        this.close();
        this.logger.trace("Completed close() from doReconnect()");
    }

    private boolean isConnecting() {
        this.mu.lock();
        try {
            boolean bl = this.status == Constants.ConnState.CONNECTING;
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    static String normalizeErr(String error) {
        String str = error;
        str = str.replaceFirst("-ERR\\s+", _EMPTY_).toLowerCase();
        str = str.replaceAll("^'|'$", _EMPTY_);
        return str;
    }

    static String normalizeErr(ByteBuffer error) {
        String str = Parser.bufToString(error).trim();
        return ConnectionImpl.normalizeErr(str);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processErr(ByteBuffer error) {
        NATSException ex = null;
        String err = ConnectionImpl.normalizeErr(error);
        this.logger.trace("processErr(error={})", (Object)err);
        if (STALE_CONNECTION.equalsIgnoreCase(err)) {
            this.processOpError(new IOException("nats: stale connection"));
        } else {
            ex = new NATSException("nats: " + err);
            ex.setConnection(this);
            this.mu.lock();
            try {
                this.setLastError(ex);
            }
            finally {
                this.mu.unlock();
            }
            this.close();
        }
    }

    protected void sendConnect() throws IOException {
        String line = null;
        this.logger.trace("sendConnect()");
        this.bw.write(this.connectProto().getBytes());
        this.logger.trace("=> {}", (Object)this.connectProto().trim());
        this.bw.flush();
        if (this.opts.isVerbose() && !_OK_OP_.equals(line = this.readLine())) {
            throw new IOException(String.format("nats: expected '%s', got '%s'", _OK_OP_, line));
        }
        this.bw.write(this.pingProtoBytes, 0, this.pingProtoBytesLen);
        this.logger.trace("=> {}", (Object)new String(this.pingProtoBytes).trim());
        this.bw.flush();
        try {
            this.logger.trace("Awaiting PONG...");
            line = this.readLine();
        }
        catch (IOException e) {
            throw new IOException("nats: connection read error", e);
        }
        if (!PONG_PROTO.trim().equals(line)) {
            if (line.startsWith(_ERR_OP_)) {
                line = ConnectionImpl.normalizeErr(line);
                throw new IOException("nats: " + line);
            }
            throw new IOException(String.format("nats: expected '%s', got '%s'", _PONG_OP_, line));
        }
        this.status = Constants.ConnState.CONNECTED;
    }

    protected String readLine() throws IOException {
        BufferedReader breader = this.conn.getBufferedReader();
        String s = null;
        this.logger.trace("readLine() Reading from input stream");
        s = breader.readLine();
        if (s == null) {
            throw new EOFException("nats: connection closed");
        }
        this.logger.trace("<= {}", (Object)(s != null ? s.trim() : "null"));
        return s;
    }

    protected void sendProto(byte[] value, int length) throws IOException {
        this.logger.trace("in sendProto()");
        this.mu.lock();
        try {
            this.logger.trace("in sendProto(), writing");
            this.bw.write(value, 0, length);
            this.logger.trace("=> {}", (Object)new String(value).trim());
            this.kickFlusher();
        }
        finally {
            this.mu.unlock();
        }
    }

    private String connectProto() {
        String[] userpass;
        String userInfo = this.url.getUserInfo();
        String user = null;
        String pass = null;
        String token = null;
        if (userInfo != null && (userpass = userInfo.split(":"))[0].length() > 0) {
            switch (userpass.length) {
                case 1: {
                    token = userpass[0];
                    break;
                }
                case 2: {
                    user = userpass[0];
                    pass = userpass[1];
                    break;
                }
            }
        }
        ConnectInfo info = new ConnectInfo(this.opts.isVerbose(), this.opts.isPedantic(), user, pass, token, this.opts.isSecure(), this.opts.getConnectionName());
        String result = String.format(CONN_PROTO, info.toJson());
        return result;
    }

    protected Control readOp() throws IOException {
        String str = this.readLine();
        Control control = new Control(str);
        this.logger.trace("readOp returning: " + control);
        return control;
    }

    private void waitForExits() {
        this.logger.trace("waitForExits()");
        this.setFlusherDone(true);
        this.kickFlusher();
        this.phaser.register();
        int numParties = this.phaser.getRegisteredParties();
        this.logger.trace("Num registered parties: {}", (Object)numParties);
        while (!this.phaser.isTerminated()) {
            this.phaser.arriveAndDeregister();
        }
        this.logger.trace("Done waiting: Num registered parties: {}", (Object)this.phaser.getRegisteredParties());
    }

    void runTasks(List<Runnable> tasks) {
        final Phaser phaser = new Phaser(1);
        for (final Runnable task : tasks) {
            phaser.register();
            new Thread(){

                @Override
                public void run() {
                    phaser.arriveAndAwaitAdvance();
                    task.run();
                    phaser.arriveAndDeregister();
                }
            }.start();
        }
        phaser.arriveAndDeregister();
    }

    protected void spinUpSocketWatchers() {
        this.logger.trace("Spinning up threads");
        if (this.phaser.getPhase() != 0) {
            this.waitForExits();
        }
        ArrayList<Runnable> tasks = new ArrayList<Runnable>();
        tasks.add(new Runnable(){

            @Override
            public void run() {
                ConnectionImpl.this.logger.trace("READLOOP STARTING");
                Thread.currentThread().setName("readloop");
                ConnectionImpl.this.readLoop();
                ConnectionImpl.this.logger.trace("READLOOP EXITING");
            }
        });
        tasks.add(new Runnable(){

            @Override
            public void run() {
                ConnectionImpl.this.logger.trace("FLUSHER STARTING");
                Thread.currentThread().setName("flusher");
                ConnectionImpl.this.flusher();
                ConnectionImpl.this.logger.trace("FLUSHER EXITING");
            }
        });
        this.runTasks(tasks);
        this.resetPingTimer();
    }

    protected Thread go(final Runnable task, final String name, final String group, final Phaser ph) {
        NATSThread.setDebug(true);
        NATSThread t = new NATSThread(task, name){

            @Override
            public void run() {
                if (ph != null) {
                    ph.register();
                    ConnectionImpl.this.logger.trace("{} registered in group {}. # registered for phase {} = {}", new Object[]{name, group, ph.getPhase(), ph.getRegisteredParties()});
                    ConnectionImpl.this.logger.trace(name + " starting");
                    ph.arriveAndAwaitAdvance();
                } else {
                    ConnectionImpl.this.logger.trace("Untracked thread " + name + " starting.");
                }
                task.run();
                if (ph != null) {
                    int oldPhase = ph.getPhase();
                    ConnectionImpl.this.logger.trace(name + " arrive and deregister for phase {}", (Object)ph.getPhase());
                    ConnectionImpl.this.logger.trace("{} (group {}) ending phase {}: Registered = {}, Arrived = {}, Unarrived={}", new Object[]{name, group, oldPhase, ph.getRegisteredParties(), ph.getArrivedParties(), ph.getUnarrivedParties()});
                    int phase = ph.arriveAndDeregister();
                    ConnectionImpl.this.logger.trace(name + " deregistered going into phase {}", (Object)phase);
                } else {
                    ConnectionImpl.this.logger.trace("Untracked thread " + name + " completed.");
                }
            }
        };
        t.start();
        NATSThread.setDebug(false);
        return t;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void readLoop() {
        Parser parser = null;
        TCPConnection conn = null;
        this.mu.lock();
        try {
            parser = this.parser;
            this.ps = parser.ps;
        }
        finally {
            this.mu.unlock();
        }
        byte[] buffer = new byte[65536];
        while (true) {
            boolean sb;
            this.mu.lock();
            try {
                boolean bl = sb = this._isClosed() || this._isReconnecting();
                if (sb) {
                    this.ps = new Parser.ParseState(parser);
                }
                conn = this.conn;
            }
            finally {
                this.mu.unlock();
            }
            if (sb || conn == null) break;
            try {
                int len = this.br.read(buffer, 0, 65536);
                if (len == -1) {
                    throw new IOException("nats: stale connection");
                }
                parser.parse(buffer, len);
            }
            catch (IOException | ParseException e) {
                this.logger.trace("Exception in readLoop(): ConnState was {}", (Object)this.status, (Object)e);
                if (this.status == Constants.ConnState.CLOSED) break;
                this.processOpError(e);
                break;
            }
        }
        this.mu.lock();
        this.ps = null;
        this.mu.unlock();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void deliverMsgs(Channel<Message> ch) {
        Message msg = null;
        this.mu.lock();
        try {
            if (this._isClosed()) {
                return;
            }
        }
        finally {
            this.mu.unlock();
        }
        while ((msg = ch.get()) != null) {
            if (msg.sub.processMsg(msg)) continue;
            this.mu.lock();
            try {
                this.removeSub(msg.sub);
                continue;
            }
            finally {
                this.mu.unlock();
                continue;
            }
            break;
        }
        return;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processMsg(byte[] data, int offset, int length) {
        SubscriptionImpl sub;
        boolean maxReached = false;
        this.mu.lock();
        try {
            this.stats.incrementInMsgs();
            this.stats.incrementInBytes(this.parser.ps.ma.size);
            sub = this.subs.get(this.ps.ma.sid);
            if (sub == null) {
                return;
            }
            sub.mu.lock();
            try {
                maxReached = sub.tallyMessage(this.ps.ma.size);
                if (!maxReached) {
                    Message msg = new Message(this.ps.ma, sub, data, offset, length);
                    sub.addMessage(msg);
                }
            }
            finally {
                sub.mu.unlock();
            }
        }
        finally {
            this.mu.unlock();
        }
        if (maxReached) {
            this.removeSub(sub);
        }
    }

    void removeSub(SubscriptionImpl sub) {
        this.subs.remove(sub.getSid());
        sub.getLock().lock();
        try {
            if (sub.getChannel() != null) {
                sub.mch.close();
                sub.mch = null;
            }
            sub.setConnection(null);
            sub.closed = true;
        }
        finally {
            sub.getLock().unlock();
        }
    }

    void processSlowConsumer(SubscriptionImpl sub) {
        IOException ex = new IOException("nats: slow consumer, messages dropped");
        final NATSException nex = new NATSException(ex, this, sub);
        this.setLastError(ex);
        if (this.opts.getExceptionHandler() != null && !sub.isSlowConsumer()) {
            this.cbexec.execute(new Runnable(){

                @Override
                public void run() {
                    ConnectionImpl.this.opts.getExceptionHandler().onException(nex);
                }
            });
        }
        sub.setSlowConsumer(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean removeFlushEntry(Channel<Boolean> ch) {
        this.logger.trace("removeFlushEntry: trying to acquire lock");
        this.mu.lock();
        this.logger.trace("removeFlushEntry: acquired lock");
        try {
            if (this.pongs.isEmpty()) {
                boolean bl = false;
                return bl;
            }
            for (Channel<Boolean> c : this.pongs) {
                if (c != ch) continue;
                c.close();
                this.pongs.remove(c);
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.mu.unlock();
            this.logger.trace("removeFlushEntry: released lock");
        }
    }

    protected void sendPing(Channel<Boolean> ch) {
        if (ch != null) {
            this.pongs.add(ch);
        }
        try {
            this.bw.write(this.pingProtoBytes, 0, this.pingProtoBytesLen);
            this.logger.trace("=> {}", (Object)new String(this.pingProtoBytes).trim());
            this.bw.flush();
        }
        catch (IOException e) {
            this.setLastError(e);
        }
    }

    protected void resetPingTimer() {
        this.mu.lock();
        try {
            Runnable pingRunnable = new Runnable(){

                @Override
                public void run() {
                    ConnectionImpl.this.processPingTimer();
                }
            };
            if (this.opts.getPingInterval() > 0L) {
                if (this.ptmr != null) {
                    this.ptmr.shutdownNow();
                }
                this.ptmr = Executors.newSingleThreadScheduledExecutor(new NATSThreadFactory("pinger"));
                this.ptmr.scheduleAtFixedRate(pingRunnable, this.opts.getPingInterval(), this.opts.getPingInterval(), TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void processPingTimer() {
        this.mu.lock();
        if (this.status != Constants.ConnState.CONNECTED) {
            this.mu.unlock();
            return;
        }
        ++this.pout;
        if (this.pout > this.opts.getMaxPingsOut()) {
            this.mu.unlock();
            this.processOpError(new IOException("nats: stale connection"));
            return;
        }
        this.logger.trace("Sending PING after {} seconds.", (Object)TimeUnit.MILLISECONDS.toSeconds(this.opts.getPingInterval()));
        this.sendPing(null);
        this.mu.unlock();
    }

    protected void unsubscribe(SubscriptionImpl sub, int max) throws IOException {
        this.unsubscribe(sub, (long)max);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unsubscribe(SubscriptionImpl sub, long max) throws IOException {
        this.mu.lock();
        try {
            if (this.isClosed()) {
                throw new IllegalStateException("nats: connection closed");
            }
            if (!this.subs.containsKey(sub.getSid())) {
                return;
            }
            if (max > 0L) {
                sub.setMax(max);
            } else {
                this.removeSub(sub);
            }
            if (!this._isReconnecting()) {
                String str = String.format(UNSUB_PROTO, sub.getSid(), max > 0L ? Long.toString(max) : _EMPTY_);
                str = str.replaceAll(" +\r\n", _CRLF_);
                byte[] unsub = str.getBytes();
                this.bw.write(unsub);
                this.logger.trace("=> {}", (Object)str.trim());
            }
        }
        finally {
            this.kickFlusher();
            this.mu.unlock();
        }
    }

    protected void kickFlusher() {
        if (this.bw != null && this.fch.getCount() == 0) {
            this.fch.add(true);
        }
    }

    private void setFlusherDone(boolean value) {
        this.flusherLock.lock();
        try {
            this.flusherDone = value;
            if (this.flusherDone) {
                this.kickFlusher();
            }
        }
        finally {
            this.flusherLock.unlock();
        }
    }

    private boolean isFlusherDone() {
        this.flusherLock.lock();
        try {
            boolean bl = this.flusherDone;
            return bl;
        }
        finally {
            this.flusherLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void flusher() {
        OutputStream bw = null;
        TCPConnection conn = null;
        Channel<Boolean> fch = null;
        this.setFlusherDone(false);
        this.mu.lock();
        bw = this.bw;
        conn = this.conn;
        fch = this.fch;
        this.mu.unlock();
        if (conn == null || bw == null || !conn.isConnected()) {
            return;
        }
        while (!this.isFlusherDone()) {
            if (!fch.get().booleanValue()) {
                return;
            }
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                this.logger.debug("flusher loop interrupted", (Throwable)e);
            }
            this.mu.lock();
            try {
                if (!this.isConnected() || this.isConnecting() || bw != this.bw || conn != this.conn) {
                    return;
                }
                bw.flush();
                this.stats.incrementFlushes();
            }
            catch (IOException e) {
                this.logger.error("I/O eception encountered during flush", (Throwable)e);
            }
            finally {
                this.mu.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush(int timeout) throws Exception {
        Exception err = null;
        if (timeout <= 0) {
            throw new IllegalArgumentException("nats: timeout invalid");
        }
        long t0 = System.nanoTime();
        long elapsed = 0L;
        Channel<Boolean> ch = null;
        this.mu.lock();
        try {
            this.logger.trace("flush(int timeout) acquired lock");
            if (this._isClosed()) {
                throw new IllegalStateException("nats: connection closed");
            }
            ch = new Channel<Boolean>(1);
            this.sendPing(ch);
        }
        finally {
            this.mu.unlock();
        }
        this.logger.trace("flush(int timeout): prior to polling PONG channel");
        Boolean rv = null;
        boolean timedOut = false;
        while (!timedOut && !ch.isClosed() && rv == null) {
            elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
            timedOut = elapsed >= (long)timeout;
            rv = ch.poll();
        }
        this.logger.trace("flush(int timeout): after polling PONG channel");
        if (timedOut) {
            err = new TimeoutException("nats: timeout");
        }
        if (rv != null && !rv.booleanValue()) {
            err = new IllegalStateException("nats: connection closed");
        } else {
            ch.close();
        }
        if (err != null) {
            this.logger.trace("flush(int timeout): before removeFlushEntry");
            this.removeFlushEntry(ch);
            this.logger.trace("flush(int timeout): before throw");
            throw err;
        }
        this.logger.trace("flush(int timeout): returning without error");
    }

    @Override
    public void flush() throws Exception {
        this.logger.trace("FLUSH");
        this.flush(60000);
        this.logger.trace("FLUSHED!");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void resendSubscriptions() {
        long adjustedMax = 0L;
        for (Long key : this.subs.keySet()) {
            SubscriptionImpl sub;
            block11: {
                sub = this.subs.get(key);
                if (sub instanceof AsyncSubscription) {
                    ((AsyncSubscriptionImpl)sub).start();
                }
                this.logger.trace("Resending subscriptions:");
                sub.mu.lock();
                try {
                    this.logger.trace("Sub = {}", (Object)sub);
                    if (sub.max <= 0L) break block11;
                    if (sub.delivered.get() < sub.max) {
                        adjustedMax = sub.max - sub.delivered.get();
                    }
                    if (adjustedMax == 0L) {
                        try {
                            this.unsubscribe(sub, 0);
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                        continue;
                    }
                }
                finally {
                    sub.mu.unlock();
                    continue;
                }
            }
            this.sendSubscriptionMessage(sub);
            if (adjustedMax <= 0L) continue;
            try {
                this.unsubscribe(sub, adjustedMax);
            }
            catch (Exception exception) {}
        }
    }

    @Override
    public AsyncSubscription subscribe(String subject, MessageHandler cb) {
        return (AsyncSubscription)this.subscribe(subject, null, cb);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Subscription subscribe(String subj, String queue, MessageHandler cb) {
        SubscriptionImpl sub = null;
        boolean async = cb != null;
        this.mu.lock();
        try {
            if (this._isClosed()) {
                throw new IllegalStateException("nats: connection closed");
            }
            sub = async ? new AsyncSubscriptionImpl(this, subj, queue, cb, this.opts.getMaxPendingMsgs(), this.opts.getMaxPendingBytes()) : new SyncSubscriptionImpl(this, subj, queue, this.opts.getMaxPendingMsgs(), this.opts.getMaxPendingBytes());
            this.addSubscription(sub);
            if (!this._isReconnecting()) {
                if (async) {
                    ((AsyncSubscriptionImpl)sub).start();
                } else {
                    this.sendSubscriptionMessage(sub);
                }
            }
            this.kickFlusher();
        }
        finally {
            this.mu.unlock();
        }
        return sub;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AsyncSubscription subscribeAsync(String subject, String queue, MessageHandler cb) {
        AsyncSubscriptionImpl sub = null;
        this.mu.lock();
        try {
            if (this._isClosed()) {
                throw new IllegalStateException("nats: connection closed");
            }
            sub = new AsyncSubscriptionImpl(this, subject, queue, null, this.opts.getMaxPendingMsgs(), this.opts.getMaxPendingBytes());
            this.addSubscription(sub);
            if (cb != null) {
                sub.setMessageHandler(cb);
                sub.start();
            }
        }
        finally {
            this.mu.unlock();
        }
        return sub;
    }

    @Override
    public AsyncSubscription subscribeAsync(String subj, String queue) {
        return this.subscribeAsync(subj, queue, null);
    }

    @Override
    public AsyncSubscription subscribeAsync(String subj, MessageHandler cb) {
        return this.subscribeAsync(subj, null, cb);
    }

    @Override
    public AsyncSubscription subscribeAsync(String subj) {
        return this.subscribeAsync(subj, null, null);
    }

    private void addSubscription(SubscriptionImpl sub) {
        sub.setSid(this.sidCounter.incrementAndGet());
        this.subs.put(sub.getSid(), sub);
        this.logger.trace("Successfully added subscription to {} [{}]", (Object)sub.getSubject(), (Object)sub.getSid());
    }

    @Override
    public SyncSubscription subscribeSync(String subject, String queue) {
        return (SyncSubscription)this.subscribe(subject, queue, null);
    }

    @Override
    public SyncSubscription subscribeSync(String subject) {
        return (SyncSubscription)this.subscribe(subject, null, null);
    }

    private void writePublishProto(ByteBuffer buffer, byte[] subject, byte[] reply, int msgSize) {
        this.pubProtoBuf.put(subject, 0, subject.length);
        if (reply != null) {
            this.pubProtoBuf.put((byte)32);
            this.pubProtoBuf.put(reply, 0, reply.length);
        }
        this.pubProtoBuf.put((byte)32);
        byte[] bytes = new byte[12];
        int idx = bytes.length;
        if (msgSize > 0) {
            for (int l = msgSize; l > 0; l /= 10) {
                bytes[--idx] = digits[l % 10];
            }
        } else {
            bytes[--idx] = digits[0];
        }
        this.pubProtoBuf.put(bytes, idx, bytes.length - idx);
        this.pubProtoBuf.put(this.crlfProtoBytes, 0, this.crlfProtoBytesLen);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void _publish(byte[] subject, byte[] reply, byte[] data) throws IOException {
        int msgSize = data != null ? data.length : 0;
        this.mu.lock();
        try {
            if ((long)msgSize > this.info.getMaxPayload()) {
                throw new IllegalArgumentException("nats: maximum payload exceeded");
            }
            if (this._isClosed()) {
                throw new IllegalStateException("nats: connection closed");
            }
            if (this._isReconnecting()) {
                try {
                    this.bw.flush();
                }
                catch (IOException e) {
                    this.logger.error("I/O exception during flush", (Throwable)e);
                }
                if (this.pending.size() >= this.opts.getReconnectBufSize()) {
                    throw new IOException("nats: outbound buffer limit exceeded");
                }
            }
            try {
                this.writePublishProto(this.pubProtoBuf, subject, reply, msgSize);
            }
            catch (BufferOverflowException e) {
                int resizeAmount = 1024 + subject.length + (reply != null ? reply.length : 0);
                this.buildPublishProtocolBuffer(resizeAmount);
                this.writePublishProto(this.pubProtoBuf, subject, reply, msgSize);
            }
            try {
                this.bw.write(this.pubProtoBuf.array(), 0, this.pubProtoBuf.position());
                this.pubProtoBuf.position(this.pubPrimBytesLen);
                if (msgSize > 0) {
                    this.bw.write(data, 0, msgSize);
                }
                this.bw.write(this.crlfProtoBytes, 0, this.crlfProtoBytesLen);
            }
            catch (IOException e) {
                this.setLastError(e);
                this.mu.unlock();
                return;
            }
            this.kickFlusher();
            this.stats.incrementOutMsgs();
            this.stats.incrementOutBytes(msgSize);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public void publish(String subject, String reply, byte[] data) throws IOException {
        if (subject == null) {
            throw new NullPointerException("nats: invalid subject");
        }
        if (subject.isEmpty()) {
            throw new IllegalArgumentException("nats: invalid subject");
        }
        byte[] subjBytes = subject.getBytes();
        byte[] replyBytes = null;
        if (reply != null) {
            replyBytes = reply.getBytes();
        }
        this._publish(subjBytes, replyBytes, data);
    }

    @Override
    public void publish(String subject, byte[] data) throws IOException {
        this.publish(subject, null, data);
    }

    @Override
    public void publish(Message msg) throws IOException {
        this._publish(msg.getSubjectBytes(), msg.getReplyToBytes(), msg.getData());
    }

    private Message _request(String subject, byte[] data, long timeout, TimeUnit unit) throws TimeoutException, IOException {
        String inbox = this.newInbox();
        Message msg = null;
        SyncSubscription sub = this.subscribeSync(inbox, null);
        sub.autoUnsubscribe(1);
        this.publish(subject, inbox, data);
        msg = sub.nextMessage(timeout, unit);
        sub.close();
        return msg;
    }

    @Override
    public Message request(String subject, byte[] data, long timeout) throws TimeoutException, IOException {
        return this.request(subject, data, timeout, TimeUnit.MILLISECONDS);
    }

    @Override
    public Message request(String subject, byte[] data, long timeout, TimeUnit unit) throws TimeoutException, IOException {
        if (timeout <= 0L) {
            throw new IllegalArgumentException("Timeout must be greater that 0.");
        }
        return this._request(subject, data, timeout, unit);
    }

    @Override
    public Message request(String subject, byte[] data) throws IOException, TimeoutException {
        return this._request(subject, data, -1L, TimeUnit.MILLISECONDS);
    }

    @Override
    public String newInbox() {
        String inbox = String.format("%s%s", inboxPrefix, NUID.nextGlobal());
        return inbox;
    }

    @Override
    public synchronized Statistics getStats() {
        return new Statistics(this.stats);
    }

    @Override
    public synchronized void resetStats() {
        this.stats.clear();
    }

    @Override
    public synchronized long getMaxPayload() {
        return this.info.getMaxPayload();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendSubscriptionMessage(SubscriptionImpl sub) {
        this.mu.lock();
        try {
            if (!this._isReconnecting()) {
                String queue = sub.getQueue();
                String subLine = String.format(SUB_PROTO, sub.getSubject(), queue != null && !queue.isEmpty() ? _SPC_ + queue : _EMPTY_, sub.getSid());
                try {
                    this.bw.write(Utilities.stringToBytesASCII(subLine));
                    this.kickFlusher();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public ClosedCallback getClosedCallback() {
        return this.opts.getClosedCallback();
    }

    @Override
    public void setClosedCallback(ClosedCallback cb) {
        this.mu.lock();
        try {
            this.opts.setClosedCallback(cb);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public DisconnectedCallback getDisconnectedCallback() {
        return this.opts.getDisconnectedCallback();
    }

    @Override
    public void setDisconnectedCallback(DisconnectedCallback cb) {
        this.mu.lock();
        try {
            this.opts.setDisconnectedCallback(cb);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public ReconnectedCallback getReconnectedCallback() {
        return this.opts.getReconnectedCallback();
    }

    @Override
    public void setReconnectedCallback(ReconnectedCallback cb) {
        this.mu.lock();
        try {
            this.opts.setReconnectedCallback(cb);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public ExceptionHandler getExceptionHandler() {
        return this.opts.getExceptionHandler();
    }

    @Override
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.mu.lock();
        try {
            this.opts.setExceptionHandler(exceptionHandler);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public String getConnectedUrl() {
        this.mu.lock();
        try {
            if (this.status != Constants.ConnState.CONNECTED) {
                String string = null;
                return string;
            }
            String string = this.url.toString();
            return string;
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public String getConnectedServerId() {
        this.mu.lock();
        try {
            if (this.status != Constants.ConnState.CONNECTED) {
                String string = null;
                return string;
            }
            String string = this.info.getId();
            return string;
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public Constants.ConnState getState() {
        this.mu.lock();
        try {
            Constants.ConnState connState = this.status;
            return connState;
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public ServerInfo getConnectedServerInfo() {
        return this.info;
    }

    void setConnectedServerInfo(ServerInfo info) {
        this.info = info;
    }

    void setConnectedServerInfo(String info) {
        this.processInfo(info);
    }

    @Override
    public Exception getLastException() {
        return this.lastEx;
    }

    private void setLastError(Exception err) {
        this.lastEx = err;
    }

    protected Options getOptions() {
        return this.opts;
    }

    void setPending(ByteArrayOutputStream pending) {
        this.pending = pending;
    }

    ByteArrayOutputStream getPending() {
        return this.pending;
    }

    protected void sleepMsec(long msec) {
        try {
            this.logger.trace("Sleeping for {} ms", (Object)msec);
            Thread.sleep(msec);
            this.logger.trace("Slept    for {} ms", (Object)msec);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    void setOutputStream(OutputStream out) {
        this.mu.lock();
        try {
            this.bw = out;
        }
        finally {
            this.mu.unlock();
        }
    }

    OutputStream getOutputStream() {
        return this.bw;
    }

    void setInputStream(BufferedInputStream in) {
        this.mu.lock();
        try {
            this.br = in;
        }
        finally {
            this.mu.unlock();
        }
    }

    BufferedInputStream getInputStream() {
        return this.br;
    }

    protected ArrayList<Channel<Boolean>> getPongs() {
        return this.pongs;
    }

    protected void setPongs(ArrayList<Channel<Boolean>> pongs) {
        this.pongs = pongs;
    }

    protected Map<Long, SubscriptionImpl> getSubs() {
        return this.subs;
    }

    protected void setSubs(Map<Long, SubscriptionImpl> subs) {
        this.subs = subs;
    }

    protected List<Srv> getServerPool() {
        return this.srvPool;
    }

    protected void setServerPool(List<Srv> pool) {
        this.srvPool = pool;
    }

    @Override
    public int getPendingByteCount() {
        int rv = 0;
        if (this.getPending() != null) {
            rv = this.getPending().size();
        }
        return rv;
    }

    protected void setFlushChannel(Channel<Boolean> fch) {
        this.fch = fch;
    }

    protected Channel<Boolean> getFlushChannel() {
        return this.fch;
    }

    protected void setTcpConnection(TCPConnection conn) {
        this.conn = conn;
    }

    protected TCPConnection getTcpConnection() {
        return this.conn;
    }

    class Srv {
        URI url = null;
        int reconnects = 0;
        long lastAttempt = 0L;
        long lastAttemptNanos = 0L;
        boolean secure = false;

        protected Srv(URI url) {
            this.url = url;
            if (url.getScheme().equals("tls")) {
                this.secure = true;
            }
        }

        void updateLastAttempt() {
            this.lastAttemptNanos = System.nanoTime();
            this.lastAttempt = System.currentTimeMillis();
        }

        long timeSinceLastAttempt() {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.lastAttemptNanos);
        }

        public String toString() {
            SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy hh:mm:ss");
            String dateToStr = format.format(new Date(this.lastAttempt));
            return String.format("{url=%s, reconnects=%d, lastAttempt=%s, timeSinceLastAttempt=%dms}", this.url.toString(), this.reconnects, dateToStr, this.timeSinceLastAttempt());
        }
    }

    class ConnectInfo {
        private Boolean verbose;
        private Boolean pedantic;
        private String user;
        private String pass;
        private String token;
        private Boolean ssl;
        private String name;
        private String lang = "java";
        private String version;

        public ConnectInfo(boolean verbose, boolean pedantic, String username, String password, String token, boolean secure, String connectionName) {
            this.version = ConnectionImpl.this.version;
            this.verbose = new Boolean(verbose);
            this.pedantic = new Boolean(pedantic);
            this.user = username;
            this.pass = password;
            this.token = token;
            this.ssl = new Boolean(secure);
            this.name = connectionName;
        }

        public String toJson() {
            StringBuilder sb = new StringBuilder(1024);
            sb.append(String.format("{\"verbose\":%s,\"pedantic\":%s,", this.verbose.toString(), this.pedantic.toString()));
            if (this.user != null) {
                sb.append(String.format("\"user\":\"%s\",", this.user));
                if (this.pass != null) {
                    sb.append(String.format("\"pass\":\"%s\",", this.pass));
                }
            }
            if (this.token != null) {
                sb.append(String.format("\"auth_token\":\"%s\",", this.token));
            }
            sb.append(String.format("\"ssl_required\":%s,\"name\":\"%s\",\"lang\":\"%s\",\"version\":\"%s\"}", this.ssl.toString(), this.name != null ? this.name : ConnectionImpl._EMPTY_, this.lang, this.version));
            return sb.toString();
        }
    }

    protected class Control {
        String op = null;
        String args = null;

        protected Control(String s) {
            if (s == null) {
                return;
            }
            String[] parts = s.split(ConnectionImpl._SPC_, 2);
            switch (parts.length) {
                case 1: {
                    this.op = parts[0].trim();
                    break;
                }
                case 2: {
                    this.op = parts[0].trim();
                    this.args = parts[1].trim();
                    if (!this.args.isEmpty()) break;
                    this.args = null;
                    break;
                }
            }
        }

        public String toString() {
            return "{op=" + this.op + ", args=" + this.args + "}";
        }
    }
}

