/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import io.nats.client.AsyncSubscription;
import io.nats.client.AsyncSubscriptionImpl;
import io.nats.client.ClosedCallback;
import io.nats.client.Connection;
import io.nats.client.ConnectionEvent;
import io.nats.client.Constants;
import io.nats.client.DisconnectedCallback;
import io.nats.client.ExceptionHandler;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.NATSException;
import io.nats.client.NUID;
import io.nats.client.NatsThreadFactory;
import io.nats.client.Options;
import io.nats.client.Parser;
import io.nats.client.ReconnectedCallback;
import io.nats.client.ServerInfo;
import io.nats.client.Statistics;
import io.nats.client.SubscriptionImpl;
import io.nats.client.SyncSubscription;
import io.nats.client.SyncSubscriptionImpl;
import io.nats.client.TcpConnection;
import io.nats.client.TcpConnectionFactory;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.net.URI;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionImpl
implements Connection {
    final Logger logger = LoggerFactory.getLogger(ConnectionImpl.class);
    String version = null;
    private static final String inboxPrefix = "_INBOX.";
    public Constants.ConnState status = Constants.ConnState.DISCONNECTED;
    protected static final String STALE_CONNECTION = "Stale Connection";
    protected static final String NATS_POOL = "jnats-pool";
    protected static final String SUB_POOL = "jnats-sub-pool";
    protected static final String LANG_STRING = "java";
    protected static final int DEFAULT_BUF_SIZE = 65536;
    protected static final int DEFAULT_STREAM_BUF_SIZE = 65536;
    protected static final int FLUSH_CHAN_SIZE = 1;
    protected long flushTimerInterval = 1L;
    protected TimeUnit flushTimerUnit = TimeUnit.MICROSECONDS;
    public static final String _CRLF_ = "\r\n";
    public static final String _EMPTY_ = "";
    public static final String _SPC_ = " ";
    public static final String _PUB_P_ = "PUB ";
    public static final String _OK_OP_ = "+OK";
    public static final String _ERR_OP_ = "-ERR";
    public static final String _MSG_OP_ = "MSG";
    public static final String _PING_OP_ = "PING";
    public static final String _PONG_OP_ = "PONG";
    public static final String _INFO_OP_ = "INFO";
    public static final String CONN_PROTO = "CONNECT %s\r\n";
    public static final String PING_PROTO = "PING\r\n";
    public static final String PONG_PROTO = "PONG\r\n";
    public static final String PUB_PROTO = "PUB %s %s %d\r\n";
    public static final String SUB_PROTO = "SUB %s%s %d\r\n";
    public static final String UNSUB_PROTO = "UNSUB %d %s\r\n";
    public static final String OK_PROTO = "+OK\r\n";
    private ConnectionImpl nc = null;
    protected final Lock mu = new ReentrantLock();
    private AtomicLong sidCounter = new AtomicLong();
    private URI url = null;
    protected Options opts = null;
    private TcpConnectionFactory tcf = null;
    TcpConnection conn = null;
    ByteBuffer pubProtoBuf = null;
    private OutputStream bw = null;
    private InputStream br = null;
    private ByteArrayOutputStream pending = null;
    protected Map<Long, SubscriptionImpl> subs = new ConcurrentHashMap<Long, SubscriptionImpl>();
    protected List<Srv> srvPool = null;
    protected Map<String, URI> urls = null;
    private Exception lastEx = null;
    private ServerInfo info = null;
    private int pout;
    protected Parser parser = new Parser(this);
    protected Parser.ParseState ps = new Parser.ParseState(this.parser);
    protected byte[] pingProtoBytes = null;
    protected int pingProtoBytesLen = 0;
    protected byte[] pongProtoBytes = null;
    protected int pongProtoBytesLen = 0;
    protected byte[] pubPrimBytes = null;
    protected int pubPrimBytesLen = 0;
    protected byte[] crlfProtoBytes = null;
    protected int crlfProtoBytesLen = 0;
    protected Statistics stats = null;
    private ArrayList<BlockingQueue<Boolean>> pongs;
    ExecutorService cbexec;
    ExecutorService exec;
    ExecutorService subexec;
    ScheduledExecutorService scheduler;
    private ScheduledFuture<?> ftmr = null;
    private ScheduledFuture<?> ptmr = null;
    private List<Future<?>> tasks = new ArrayList();
    private static final int NUM_WATCHER_THREADS = 2;
    private CountDownLatch socketWatchersStartLatch = new CountDownLatch(2);
    private CountDownLatch socketWatchersDoneLatch = null;
    private BlockingQueue<Boolean> fch;
    static final byte[] digits = new byte[]{48, 49, 50, 51, 52, 53, 54, 55, 56, 57};

    ConnectionImpl() {
    }

    ConnectionImpl(Options opts) {
        this(opts, null);
    }

    ConnectionImpl(Options opts, TcpConnectionFactory connFac) {
        Properties props = this.getProperties("jnats.properties");
        this.version = props.getProperty("client.version");
        this.nc = this;
        this.opts = opts;
        this.stats = new Statistics();
        this.tcf = connFac != null ? connFac : new TcpConnectionFactory();
        this.setTcpConnection(this.tcf.createConnection());
        this.sidCounter.set(0L);
        this.pingProtoBytes = PING_PROTO.getBytes();
        this.pingProtoBytesLen = this.pingProtoBytes.length;
        this.pongProtoBytes = PONG_PROTO.getBytes();
        this.pongProtoBytesLen = this.pongProtoBytes.length;
        this.pubPrimBytes = _PUB_P_.getBytes();
        this.pubPrimBytesLen = this.pubPrimBytes.length;
        this.crlfProtoBytes = _CRLF_.getBytes();
        this.crlfProtoBytesLen = this.crlfProtoBytes.length;
        this.buildPublishProtocolBuffer(1024);
        this.setupServerPool();
    }

    void setup() {
        this.scheduler = Executors.newScheduledThreadPool(2);
        this.cbexec = Executors.newSingleThreadExecutor(new NatsThreadFactory("cbexec"));
        this.exec = Executors.newCachedThreadPool(new NatsThreadFactory(NATS_POOL));
        this.subexec = Executors.newCachedThreadPool(new NatsThreadFactory(SUB_POOL));
        this.fch = this.createFlushChannel();
        this.pongs = this.createPongs();
        this.subs.clear();
    }

    protected Properties getProperties(InputStream inputStream) {
        Properties rv = new Properties();
        try {
            if (inputStream == null) {
                rv = null;
            } else {
                rv.load(inputStream);
            }
        }
        catch (IOException e) {
            this.logger.warn("nats: error loading properties from InputStream", (Throwable)e);
            rv = null;
        }
        return rv;
    }

    protected Properties getProperties(String resourceName) {
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(resourceName);
        return this.getProperties(is);
    }

    private void buildPublishProtocolBuffer(int size) {
        this.pubProtoBuf = ByteBuffer.allocate(size);
        this.pubProtoBuf.put(this.pubPrimBytes, 0, this.pubPrimBytesLen);
        this.pubProtoBuf.mark();
    }

    protected void setupServerPool() {
        URI url = this.opts.getUrl();
        List<URI> servers = this.opts.getServers();
        this.srvPool = new ArrayList<Srv>();
        this.urls = new ConcurrentHashMap<String, URI>();
        if (servers != null) {
            for (URI s : servers) {
                this.addUrlToPool(s);
            }
        }
        if (!this.opts.isNoRandomize()) {
            Collections.shuffle(this.srvPool, new Random(System.nanoTime()));
        }
        if (url != null) {
            this.srvPool.add(0, new Srv(url));
            this.urls.put(url.getAuthority(), url);
        }
        if (this.srvPool.isEmpty()) {
            this.addUrlToPool("nats://localhost:4222");
        }
        this.setUrl(this.srvPool.get((int)0).url);
    }

    void addUrlToPool(String srvUrl) {
        URI uri = URI.create(srvUrl);
        this.srvPool.add(new Srv(uri));
        this.urls.put(uri.getAuthority(), uri);
    }

    void addUrlToPool(URI uri) {
        this.srvPool.add(new Srv(uri));
        this.urls.put(uri.getAuthority(), uri);
    }

    protected Srv currentServer() {
        Srv rv = null;
        for (Srv s : this.srvPool) {
            if (!s.url.equals(this.getUrl())) continue;
            rv = s;
            break;
        }
        return rv;
    }

    protected Srv selectNextServer() throws IOException {
        this.logger.trace("In selectNextServer()");
        Srv srv = this.currentServer();
        if (srv == null) {
            throw new IOException("nats: no servers available for connection");
        }
        this.logger.trace("selectNextServer, removing {}", (Object)srv);
        this.srvPool.remove(srv);
        int maxReconnect = this.opts.getMaxReconnect();
        if (maxReconnect < 0 || srv.reconnects < maxReconnect) {
            this.logger.trace("selectNextServer: adding {}, maxReconnect: {}", (Object)srv, (Object)maxReconnect);
            this.srvPool.add(srv);
        }
        if (this.srvPool.isEmpty()) {
            this.setUrl(null);
            throw new IOException("nats: no servers available for connection");
        }
        return this.srvPool.get(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connect() throws IOException, TimeoutException {
        IOException returnedErr = null;
        this.mu.lock();
        try {
            for (int i = 0; i < this.srvPool.size(); ++i) {
                Srv srv = this.srvPool.get(i);
                this.setUrl(srv.url);
                try {
                    this.logger.debug("Connecting to {}", (Object)this.getUrl());
                    this.createConn();
                    this.logger.debug("Connected to {}", (Object)this.getUrl());
                    this.setup();
                    try {
                        this.processConnectInit();
                        this.logger.trace("connect() Resetting reconnects for {}", (Object)srv);
                        srv.reconnects = 0;
                        returnedErr = null;
                        break;
                    }
                    catch (IOException e) {
                        returnedErr = e;
                        this.logger.trace("{} Exception: {}", (Object)this.getUrl(), (Object)e.getMessage());
                        this.mu.unlock();
                        this.close(Constants.ConnState.DISCONNECTED, false);
                        this.mu.lock();
                        this.setUrl(null);
                        continue;
                    }
                }
                catch (IOException e) {
                    if (!(e instanceof SocketException) || e.getMessage() == null || !e.getMessage().contains("Connection refused")) continue;
                    this.setLastError(null);
                }
            }
            if (returnedErr == null && this.status != Constants.ConnState.CONNECTED) {
                returnedErr = new IOException("nats: no servers available for connection");
            }
            if (returnedErr != null) {
                throw returnedErr;
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void createConn() throws IOException {
        Srv srv = this.currentServer();
        if (srv == null) {
            throw new IOException("nats: no servers available for connection");
        }
        srv.updateLastAttempt();
        this.logger.trace("createConn(): {}", (Object)srv.url);
        try {
            this.logger.trace("Opening {}", (Object)srv.url);
            this.conn = this.tcf.createConnection();
            this.conn.open(srv.url.getHost(), srv.url.getPort(), this.opts.getConnectionTimeout());
            this.logger.trace("Opened {}", (Object)srv.url);
        }
        catch (IOException e) {
            this.logger.debug("Couldn't establish connection to {}: {}", (Object)srv.url, (Object)e.getMessage());
            throw e;
        }
        if (this.pending != null && this.bw != null) {
            this.logger.trace("Flushing old outputstream to pending");
            try {
                this.bw.flush();
            }
            catch (IOException e) {
                this.logger.warn("nats: i/o exception when flushing output stream");
            }
        }
        this.bw = this.conn.getOutputStream(65536);
        this.br = this.conn.getInputStream(65536);
    }

    BlockingQueue<Message> createMsgChannel() {
        return this.createMsgChannel(Integer.MAX_VALUE);
    }

    BlockingQueue<Message> createMsgChannel(int size) {
        int theSize = size;
        if (theSize <= 0) {
            theSize = 1;
        }
        return new LinkedBlockingQueue<Message>(theSize);
    }

    BlockingQueue<Boolean> createBooleanChannel() {
        return new LinkedBlockingQueue<Boolean>();
    }

    BlockingQueue<Boolean> createBooleanChannel(int size) {
        int theSize = size;
        if (theSize <= 0) {
            theSize = 1;
        }
        return new LinkedBlockingQueue<Boolean>(theSize);
    }

    BlockingQueue<Boolean> createFlushChannel() {
        return new LinkedBlockingQueue<Boolean>(1);
    }

    void clearPendingFlushCalls() {
        if (this.pongs == null) {
            return;
        }
        for (BlockingQueue<Boolean> ch : this.pongs) {
            if (ch == null) continue;
            ch.clear();
            ch.add(false);
        }
        this.pongs.clear();
        this.pongs = null;
    }

    @Override
    public void close() {
        this.close(Constants.ConnState.CLOSED, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(Constants.ConnState closeState, boolean doCBs) {
        this.logger.debug("close({}, {})", (Object)closeState, (Object)String.valueOf(doCBs));
        final ConnectionImpl nc = this;
        this.mu.lock();
        try {
            if (this._isClosed()) {
                this.status = closeState;
                return;
            }
            this.status = Constants.ConnState.CLOSED;
            this.kickFlusher();
        }
        finally {
            this.mu.unlock();
        }
        this.mu.lock();
        try {
            this.clearPendingFlushCalls();
            if (this.ptmr != null) {
                this.ptmr.cancel(true);
            }
            if (this.ftmr != null) {
                this.ftmr.cancel(true);
            }
            if (this.conn != null) {
                try {
                    if (this.bw != null) {
                        this.bw.flush();
                    }
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
            this.logger.trace("Closing subscriptions");
            for (Map.Entry<Long, SubscriptionImpl> entry : this.subs.entrySet()) {
                SubscriptionImpl sub = entry.getValue();
                sub.lock();
                try {
                    sub.closeChannel();
                    sub.closed = true;
                    sub.connClosed = true;
                    sub.close();
                }
                finally {
                    sub.unlock();
                }
            }
            this.subs.clear();
            if (doCBs) {
                if (this.opts.getDisconnectedCallback() != null && this.conn != null) {
                    this.cbexec.execute(new Runnable(){

                        @Override
                        public void run() {
                            ConnectionImpl.this.opts.getDisconnectedCallback().onDisconnect(new ConnectionEvent(nc));
                            ConnectionImpl.this.logger.trace("executed DisconnectedCB");
                        }
                    });
                }
                if (this.opts.getClosedCallback() != null) {
                    this.cbexec.execute(new Runnable(){

                        @Override
                        public void run() {
                            ConnectionImpl.this.opts.getClosedCallback().onClose(new ConnectionEvent(nc));
                            ConnectionImpl.this.logger.trace("executed ClosedCB");
                        }
                    });
                }
            }
            this.status = closeState;
            if (this.conn != null) {
                this.conn.close();
            }
            if (this.exec != null) {
                this.exec.shutdownNow();
            }
            if (this.subexec != null) {
                this.subexec.shutdownNow();
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void processConnectInit() throws IOException {
        this.logger.trace("processConnectInit(): {}", (Object)this.getUrl());
        this.status = Constants.ConnState.CONNECTING;
        this.processExpectedInfo();
        this.sendConnect();
        this.setActualPingsOutstanding(0);
        this.spinUpSocketWatchers();
    }

    private void checkForSecure() throws IOException {
        if (this.opts.isSecure() && !this.info.isTlsRequired()) {
            throw new IOException("nats: secure connection not available");
        }
        if (this.info.isTlsRequired() && !this.opts.isSecure()) {
            throw new IOException("nats: secure connection required");
        }
        if (this.opts.isSecure() || "tls".equals(this.getUrl().getScheme())) {
            this.makeTLSConn();
        }
    }

    void makeTLSConn() throws IOException {
        this.conn.setTlsDebug(this.opts.isTlsDebug());
        this.conn.makeTLS(this.opts.getSslContext());
        this.bw = this.conn.getOutputStream(65536);
        this.br = this.conn.getInputStream(65536);
    }

    protected void processExpectedInfo() throws IOException {
        Control control;
        try {
            control = this.readOp();
        }
        catch (IOException e) {
            this.processOpError(e);
            return;
        }
        if (!control.op.equals(_INFO_OP_)) {
            throw new IOException("nats: protocol exception, INFO not received");
        }
        this.processInfo(control.args);
        this.checkForSecure();
    }

    protected void processPing() {
        try {
            this.sendProto(this.pongProtoBytes, this.pongProtoBytesLen);
        }
        catch (IOException e) {
            this.setLastError(e);
        }
    }

    protected void processPong() {
        this.logger.trace("Processing PONG");
        BlockingQueue<Boolean> ch = this.createBooleanChannel(1);
        this.mu.lock();
        try {
            if (this.pongs != null && this.pongs.size() > 0) {
                ch = this.pongs.get(0);
                this.pongs.remove(0);
            }
            this.setActualPingsOutstanding(0);
        }
        finally {
            this.mu.unlock();
        }
        if (ch != null) {
            try {
                ch.put(true);
            }
            catch (InterruptedException e) {
                this.logger.warn("processPong interrupted", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
        this.logger.trace("Processed PONG");
    }

    protected void processOk() {
    }

    protected void processInfo(String infoString) {
        if (infoString == null || infoString.isEmpty()) {
            return;
        }
        this.info = ServerInfo.createFromWire(infoString);
        boolean updated = false;
        if (this.info.getConnectUrls() != null) {
            for (String s : this.info.getConnectUrls()) {
                if (this.urls.containsKey(s)) continue;
                this.addUrlToPool(String.format("nats://%s", s));
                updated = true;
            }
            if (updated && !this.opts.isNoRandomize()) {
                Collections.shuffle(this.srvPool);
            }
        }
    }

    void processAsyncInfo(String asyncInfoString) {
        this.mu.lock();
        try {
            this.processInfo(asyncInfoString);
        }
        finally {
            this.mu.unlock();
        }
    }

    void processOpError(Exception err) {
        this.mu.lock();
        try {
            if (this.isConnecting() || this._isClosed() || this._isReconnecting()) {
                return;
            }
            if (this.opts.isReconnectAllowed() && this.status == Constants.ConnState.CONNECTED) {
                this.status = Constants.ConnState.RECONNECTING;
                if (this.ptmr != null) {
                    this.ptmr.cancel(true);
                }
                if (this.ftmr != null) {
                    this.ftmr.cancel(true);
                }
                if (this.conn != null) {
                    try {
                        this.bw.flush();
                    }
                    catch (IOException e1) {
                        this.logger.error("I/O error during flush", (Throwable)e1);
                    }
                    this.conn.close();
                }
                this.logger.trace("processOpError: redirecting output to pending buffer");
                this.setPending(new ByteArrayOutputStream(this.opts.getReconnectBufSize()));
                this.setOutputStream(this.getPending());
                this.logger.trace("\t\tspawning doReconnect() in state {}", (Object)this.status);
                if (this.exec.isShutdown()) {
                    this.exec = Executors.newCachedThreadPool(new NatsThreadFactory(NATS_POOL));
                }
                this.exec.submit(new Runnable(){

                    @Override
                    public void run() {
                        Thread.currentThread().setName("reconnect");
                        ConnectionImpl.this.doReconnect();
                    }
                });
                if (this.cbexec.isShutdown()) {
                    this.cbexec = Executors.newSingleThreadExecutor(new NatsThreadFactory(NATS_POOL));
                }
                this.logger.trace("\t\tspawned doReconnect() in state {}", (Object)this.status);
                return;
            }
            this.logger.trace("\t\tcalling processDisconnect() in state {}", (Object)this.status);
            this.processDisconnect();
            this.setLastError(err);
            this.close();
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void processDisconnect() {
        this.logger.trace("processDisconnect()");
        this.status = Constants.ConnState.DISCONNECTED;
    }

    @Override
    public boolean isReconnecting() {
        this.mu.lock();
        try {
            boolean bl = this._isReconnecting();
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    boolean _isReconnecting() {
        return this.status == Constants.ConnState.RECONNECTING;
    }

    @Override
    public boolean isConnected() {
        this.mu.lock();
        try {
            boolean bl = this._isConnected();
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    private boolean _isConnected() {
        return this.status == Constants.ConnState.CONNECTING || this.status == Constants.ConnState.CONNECTED;
    }

    @Override
    public boolean isClosed() {
        this.mu.lock();
        try {
            boolean bl = this._isClosed();
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    boolean _isClosed() {
        return this.status == Constants.ConnState.CLOSED;
    }

    protected void flushReconnectPendingItems() {
        this.logger.trace("flushReconnectPendingItems()");
        if (this.pending == null) {
            return;
        }
        if (this.pending.size() > 0) {
            try {
                this.logger.trace("flushReconnectPendingItems() writing {} bytes.", (Object)this.pending.size());
                this.bw.write(this.pending.toByteArray(), 0, this.pending.size());
                this.bw.flush();
            }
            catch (IOException e) {
                this.logger.error("Error flushing pending items", (Throwable)e);
            }
        }
        this.pending = null;
        this.logger.trace("flushReconnectPendingItems() DONE");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void doReconnect() {
        this.logger.trace("doReconnect()");
        this.waitForExits();
        this.mu.lock();
        try {
            this.nc.clearPendingFlushCalls();
            this.setLastError(null);
            if (this.opts.getDisconnectedCallback() != null) {
                this.logger.trace("Spawning disconnectCB from doReconnect()");
                this.cbexec.execute(new Runnable(){

                    @Override
                    public void run() {
                        ConnectionImpl.this.opts.getDisconnectedCallback().onDisconnect(new ConnectionEvent(ConnectionImpl.this.nc));
                    }
                });
                this.logger.trace("Spawned disconnectCB from doReconnect()");
            }
            while (!this.srvPool.isEmpty()) {
                Srv cur = null;
                try {
                    cur = this.selectNextServer();
                    this.setUrl(cur.url);
                }
                catch (IOException nse) {
                    this.logger.trace("doReconnect() calling setLastError({})", (Object)nse.getMessage());
                    this.setLastError(nse);
                    break;
                }
                long elapsedMillis = cur.timeSinceLastAttempt();
                if (elapsedMillis < this.opts.getReconnectWait()) {
                    long sleepTime = this.opts.getReconnectWait() - elapsedMillis;
                    this.mu.unlock();
                    try {
                        this.sleepInterval((int)sleepTime, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        this.logger.debug("Interrupted while in doReconnect()");
                        break;
                    }
                    finally {
                        this.mu.lock();
                    }
                }
                if (this.isClosed()) {
                    this.logger.debug("Connection has been closed while in doReconnect()");
                    break;
                }
                ++cur.reconnects;
                this.logger.trace("doReconnect() incremented cur.reconnects: {}", (Object)cur);
                this.logger.trace("doReconnect: trying createConn() for {}", (Object)cur);
                try {
                    this.conn.teardown();
                    this.createConn();
                    this.logger.trace("doReconnect: createConn() successful for {}", (Object)cur);
                }
                catch (Exception e) {
                    this.conn.teardown();
                    this.logger.trace("doReconnect: createConn() failed for {}", (Object)cur);
                    this.logger.trace("createConn failed", (Throwable)e);
                    this.setLastError(null);
                    continue;
                }
                this.stats.incrementReconnects();
                try {
                    this.processConnectInit();
                }
                catch (IOException e) {
                    this.conn.teardown();
                    this.logger.warn("doReconnect: processConnectInit FAILED for {}", (Object)cur, (Object)e);
                    this.setLastError(e);
                    this.status = Constants.ConnState.RECONNECTING;
                    continue;
                }
                this.logger.trace("Successful reconnect; Resetting reconnects for {}", (Object)cur);
                cur.reconnects = 0;
                this.resendSubscriptions();
                this.flushReconnectPendingItems();
                this.logger.debug("just called flushReconnectPendingItems");
                try {
                    this.getOutputStream().flush();
                }
                catch (IOException e) {
                    this.logger.debug("Error flushing output stream");
                    this.setLastError(e);
                    this.status = Constants.ConnState.RECONNECTING;
                    continue;
                }
                this.setPending(null);
                this.status = Constants.ConnState.CONNECTED;
                if (this.opts.getReconnectedCallback() != null) {
                    this.logger.trace("Spawning reconnectedCb from doReconnect()");
                    this.cbexec.execute(new Runnable(){

                        @Override
                        public void run() {
                            ConnectionImpl.this.opts.getReconnectedCallback().onReconnect(new ConnectionEvent(ConnectionImpl.this.nc));
                        }
                    });
                    this.logger.trace("Spawned reconnectedCb from doReconnect()");
                }
                this.mu.unlock();
                try {
                    this.flush();
                }
                catch (Exception e) {
                    this.logger.warn("Error flushing connection", (Throwable)e);
                }
                finally {
                    this.mu.lock();
                }
                this.logger.trace("doReconnect reconnected successfully!");
                return;
            }
            this.logger.trace("Reconnect FAILED");
            if (this.getLastException() == null) {
                this.setLastError(new IOException("nats: no servers available for connection"));
            }
        }
        finally {
            this.mu.unlock();
        }
        this.logger.trace("Calling   close() from doReconnect()");
        this.close();
        this.logger.trace("Completed close() from doReconnect()");
    }

    boolean isConnecting() {
        return this.status == Constants.ConnState.CONNECTING;
    }

    static String normalizeErr(String error) {
        String str = error;
        str = str.replaceFirst("-ERR\\s+", _EMPTY_).toLowerCase();
        str = str.replaceAll("^'|'$", _EMPTY_);
        return str;
    }

    static String normalizeErr(ByteBuffer error) {
        String str = Parser.bufToString(error).trim();
        return ConnectionImpl.normalizeErr(str);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processErr(ByteBuffer error) {
        NATSException ex = null;
        String err = ConnectionImpl.normalizeErr(error);
        this.logger.trace("processErr(error={})", (Object)err);
        if (STALE_CONNECTION.equalsIgnoreCase(err)) {
            this.processOpError(new IOException("nats: stale connection"));
        } else if (err.startsWith("permissions violation")) {
            this.processPermissionsViolation(err);
        } else {
            ex = new NATSException("nats: " + err);
            ex.setConnection(this);
            this.mu.lock();
            try {
                this.setLastError(ex);
            }
            finally {
                this.mu.unlock();
            }
            this.close();
        }
    }

    protected void sendConnect() throws IOException {
        String line = null;
        this.logger.trace("sendConnect()");
        this.bw.write(this.connectProto().getBytes());
        this.logger.trace("=> {}", (Object)this.connectProto().trim());
        this.bw.flush();
        if (this.opts.isVerbose() && !_OK_OP_.equals(line = this.readLine())) {
            throw new IOException(String.format("nats: expected '%s', got '%s'", _OK_OP_, line));
        }
        this.bw.write(this.pingProtoBytes, 0, this.pingProtoBytesLen);
        this.logger.trace("=> {}", (Object)new String(this.pingProtoBytes).trim());
        this.bw.flush();
        try {
            this.logger.trace("Awaiting PONG...");
            line = this.readLine();
        }
        catch (IOException e) {
            throw new IOException("nats: connection read error", e);
        }
        if (!PONG_PROTO.trim().equals(line)) {
            if (line.startsWith(_ERR_OP_)) {
                line = ConnectionImpl.normalizeErr(line);
                throw new IOException("nats: " + line);
            }
            throw new IOException(String.format("nats: expected '%s', got '%s'", _PONG_OP_, line));
        }
        this.status = Constants.ConnState.CONNECTED;
    }

    protected String readLine() throws IOException {
        BufferedReader breader = this.conn.getBufferedReader();
        String s = null;
        this.logger.trace("readLine() Reading from input stream");
        s = breader.readLine();
        if (s == null) {
            throw new EOFException("nats: connection closed");
        }
        this.logger.trace("<= {}", (Object)(s != null ? s.trim() : "null"));
        return s;
    }

    protected void sendProto(byte[] value, int length) throws IOException {
        this.logger.trace("in sendProto()");
        this.mu.lock();
        try {
            this.logger.trace("in sendProto(), writing");
            this.bw.write(value, 0, length);
            this.logger.trace("=> {}", (Object)new String(value).trim());
            this.kickFlusher();
        }
        finally {
            this.mu.unlock();
        }
    }

    String connectProto() {
        String userInfo = this.getUrl().getUserInfo();
        String user = null;
        String pass = null;
        String token = null;
        if (userInfo != null) {
            String[] userpass = userInfo.split(":");
            if (userpass[0].length() > 0) {
                switch (userpass.length) {
                    case 1: {
                        token = userpass[0];
                        break;
                    }
                    case 2: {
                        user = userpass[0];
                        pass = userpass[1];
                        break;
                    }
                }
            }
        } else {
            user = this.opts.getUsername();
            pass = this.opts.getPassword();
            token = this.opts.getToken();
        }
        ConnectInfo info = new ConnectInfo(this.opts.isVerbose(), this.opts.isPedantic(), user, pass, token, this.opts.isSecure(), this.opts.getConnectionName(), LANG_STRING, this.version, ClientProto.CLIENT_PROTO_INFO);
        String result = String.format(CONN_PROTO, info);
        return result;
    }

    protected Control readOp() throws IOException {
        String str = this.readLine();
        Control control = new Control(str);
        this.logger.trace("readOp returning: " + control);
        return control;
    }

    private void waitForExits() {
        this.kickFlusher();
        if (this.socketWatchersDoneLatch != null) {
            try {
                this.socketWatchersDoneLatch.await();
            }
            catch (InterruptedException e) {
                this.logger.warn("nats: interrupted waiting for threads to exit");
                Thread.currentThread().interrupt();
            }
        }
    }

    protected void spinUpSocketWatchers() {
        this.logger.trace("Spinning up threads");
        this.waitForExits();
        this.socketWatchersDoneLatch = new CountDownLatch(2);
        this.socketWatchersStartLatch = new CountDownLatch(2);
        Future<?> task = this.exec.submit(new Runnable(){

            @Override
            public void run() {
                ConnectionImpl.this.logger.debug("readloop starting...");
                Thread.currentThread().setName("readloop");
                ConnectionImpl.this.socketWatchersStartLatch.countDown();
                try {
                    ConnectionImpl.this.socketWatchersStartLatch.await();
                    ConnectionImpl.this.readLoop();
                }
                catch (InterruptedException e) {
                    ConnectionImpl.this.logger.debug("Interrupted", (Throwable)e);
                }
                ConnectionImpl.this.socketWatchersDoneLatch.countDown();
                ConnectionImpl.this.logger.debug("readloop exiting");
            }
        });
        this.tasks.add(task);
        task = this.exec.submit(new Runnable(){

            @Override
            public void run() {
                ConnectionImpl.this.logger.debug("flusher starting...");
                Thread.currentThread().setName("flusher");
                ConnectionImpl.this.socketWatchersStartLatch.countDown();
                try {
                    ConnectionImpl.this.socketWatchersStartLatch.await();
                    ConnectionImpl.this.flusher();
                }
                catch (InterruptedException e) {
                    ConnectionImpl.this.logger.debug("Interrupted", (Throwable)e);
                }
                ConnectionImpl.this.socketWatchersDoneLatch.countDown();
                ConnectionImpl.this.logger.debug("flusher exiting");
            }
        });
        this.tasks.add(task);
        this.socketWatchersStartLatch.countDown();
        this.resetPingTimer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void readLoop() {
        Parser parser = null;
        TcpConnection conn = null;
        this.mu.lock();
        parser = this.parser;
        this.ps = parser.ps;
        this.mu.unlock();
        byte[] buffer = new byte[65536];
        while (true) {
            boolean sb;
            this.mu.lock();
            try {
                boolean bl = sb = this._isClosed() || this._isReconnecting();
                if (sb) {
                    this.ps = new Parser.ParseState(parser);
                }
                conn = this.conn;
            }
            finally {
                this.mu.unlock();
            }
            if (sb || conn == null) break;
            try {
                int len = this.br.read(buffer);
                if (len == -1) {
                    throw new IOException("nats: stale connection");
                }
                parser.parse(buffer, len);
            }
            catch (IOException | ParseException e) {
                this.logger.debug("Exception in readloop(): '{}' (state: {})", (Object)e.getMessage(), (Object)this.status);
                if (this.status == Constants.ConnState.CLOSED) break;
                this.processOpError(e);
                break;
            }
        }
        this.mu.lock();
        this.ps = null;
        this.mu.unlock();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForMsgs(AsyncSubscriptionImpl sub) throws InterruptedException {
        block11: {
            long max;
            long delivered = 0L;
            do {
                boolean closed;
                MessageHandler mcb;
                Message msg;
                sub.lock();
                try {
                    BlockingQueue<Message> mch = sub.getChannel();
                    while (mch.size() == 0 && !sub.isClosed()) {
                        sub.pCond.await();
                    }
                    msg = (Message)mch.poll();
                    if (msg != null) {
                        --sub.pMsgs;
                        sub.pBytes = sub.pBytes - (msg.getData() == null ? 0 : msg.getData().length);
                    }
                    mcb = sub.msgHandler;
                    max = sub.max;
                    closed = sub.isClosed();
                    if (!closed) {
                        delivered = ++sub.delivered;
                    }
                }
                finally {
                    sub.unlock();
                }
                if (closed) break block11;
                if (msg == null || max > 0L && delivered > max) continue;
                mcb.onMessage(msg);
            } while (max <= 0L || delivered < max);
            this.mu.lock();
            try {
                this.removeSub(sub);
            }
            finally {
                this.mu.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processMsg(byte[] data, int offset, int length) {
        block15: {
            this.mu.lock();
            try {
                this.stats.incrementInMsgs();
                this.stats.incrementInBytes(length);
                SubscriptionImpl sub = this.subs.get(this.ps.ma.sid);
                if (sub == null) {
                    return;
                }
                Message msg = new Message(this.ps.ma, sub, data, offset, length);
                sub.lock();
                try {
                    ++sub.pMsgs;
                    if (sub.pMsgs > sub.pMsgsMax) {
                        sub.pMsgsMax = sub.pMsgs;
                    }
                    sub.pBytes = sub.pBytes + (msg.getData() == null ? 0 : msg.getData().length);
                    if (sub.pBytes > sub.pBytesMax) {
                        sub.pBytesMax = sub.pBytes;
                    }
                    if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit || sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
                        this.handleSlowConsumer(sub, msg);
                        return;
                    }
                    if (sub.getChannel() == null) break block15;
                    if (sub.getChannel().add(msg)) {
                        sub.pCond.signal();
                        sub.setSlowConsumer(false);
                        break block15;
                    }
                    this.handleSlowConsumer(sub, msg);
                    return;
                }
                finally {
                    sub.unlock();
                }
            }
            finally {
                this.mu.unlock();
            }
        }
    }

    protected void handleSlowConsumer(SubscriptionImpl sub, Message msg) {
        ++sub.dropped;
        this.processSlowConsumer(sub);
        --sub.pMsgs;
        if (msg.getData() != null) {
            sub.pBytes -= msg.getData().length;
        }
    }

    void removeSub(SubscriptionImpl sub) {
        this.subs.remove(sub.getSid());
        sub.lock();
        try {
            if (sub.getChannel() != null) {
                sub.mch.clear();
                sub.mch = null;
            }
            sub.setConnection(null);
            sub.closed = true;
        }
        finally {
            sub.unlock();
        }
    }

    void processSlowConsumer(SubscriptionImpl sub) {
        IOException ex = new IOException("nats: slow consumer, messages dropped");
        final NATSException nex = new NATSException(ex, this, sub);
        this.setLastError(ex);
        if (this.opts.getExceptionHandler() != null && !sub.isSlowConsumer()) {
            this.cbexec.execute(new Runnable(){

                @Override
                public void run() {
                    ConnectionImpl.this.opts.getExceptionHandler().onException(nex);
                }
            });
        }
        sub.setSlowConsumer(true);
    }

    void processPermissionsViolation(String err) {
        IOException serverEx = new IOException("nats: " + err);
        final NATSException nex = new NATSException(serverEx);
        nex.setConnection(this);
        this.setLastError(serverEx);
        if (this.opts.getExceptionHandler() != null) {
            this.cbexec.execute(new Runnable(){

                @Override
                public void run() {
                    ConnectionImpl.this.opts.getExceptionHandler().onException(nex);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean removeFlushEntry(BlockingQueue<Boolean> ch) {
        this.mu.lock();
        try {
            if (this.pongs == null) {
                boolean bl = false;
                return bl;
            }
            for (BlockingQueue<Boolean> c : this.pongs) {
                if (c != ch) continue;
                c.clear();
                this.pongs.remove(c);
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void sendPing(BlockingQueue<Boolean> ch) {
        if (this.pongs == null) {
            this.pongs = this.createPongs();
        }
        if (ch != null) {
            this.pongs.add(ch);
        }
        try {
            this.bw.write(this.pingProtoBytes, 0, this.pingProtoBytesLen);
            this.logger.trace("=> {}", (Object)new String(this.pingProtoBytes).trim());
            this.bw.flush();
        }
        catch (IOException e) {
            this.setLastError(e);
        }
    }

    ArrayList<BlockingQueue<Boolean>> createPongs() {
        return new ArrayList<BlockingQueue<Boolean>>();
    }

    ScheduledFuture<?> createPingTimer() {
        PingTimerTask pinger = new PingTimerTask();
        ScheduledFuture<?> future = this.scheduler.scheduleWithFixedDelay(pinger, this.opts.getPingInterval(), this.opts.getPingInterval(), TimeUnit.MILLISECONDS);
        return future;
    }

    protected void resetPingTimer() {
        this.mu.lock();
        try {
            if (this.ptmr != null) {
                this.ptmr.cancel(true);
            }
            if (this.opts.getPingInterval() > 0L) {
                this.ptmr = this.createPingTimer();
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void writeUnsubProto(SubscriptionImpl sub, long max) throws IOException {
        String str = String.format(UNSUB_PROTO, sub.getSid(), max > 0L ? Long.toString(max) : _EMPTY_);
        str = str.replaceAll(" +\r\n", _CRLF_);
        byte[] unsub = str.getBytes();
        this.bw.write(unsub);
        this.logger.trace("=> {}", (Object)str.trim());
    }

    protected void unsubscribe(SubscriptionImpl sub, int max) throws IOException {
        this.unsubscribe(sub, (long)max);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unsubscribe(SubscriptionImpl sub, long max) throws IOException {
        this.mu.lock();
        try {
            if (this.isClosed()) {
                throw new IllegalStateException("nats: connection closed");
            }
            SubscriptionImpl s = this.subs.get(sub.getSid());
            if (s == null) {
                return;
            }
            if (max > 0L) {
                s.setMax(max);
            } else {
                this.removeSub(s);
            }
            if (!this._isReconnecting()) {
                this.writeUnsubProto(s, max);
            }
            this.kickFlusher();
        }
        finally {
            this.mu.unlock();
        }
    }

    protected void kickFlusher() {
        if (this.bw != null && this.fch != null) {
            this.fch.offer(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void flushSocket() {
        BlockingQueue<Boolean> fch;
        TcpConnection conn;
        OutputStream bw;
        this.mu.lock();
        try {
            bw = this.bw;
            conn = this.conn;
            fch = this.fch;
        }
        finally {
            this.mu.unlock();
        }
        if (conn == null || bw == null) {
            return;
        }
        if (fch.poll() == null) {
            return;
        }
        this.mu.lock();
        try {
            if (!this._isConnected() || this.isConnecting() || bw != this.bw || conn != this.conn) {
                return;
            }
            bw.flush();
            this.stats.incrementFlushes();
        }
        catch (IOException e) {
            this.logger.debug("I/O exception encountered during flush");
            this.setLastError(e);
        }
        finally {
            this.mu.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void flusher() throws InterruptedException {
        this.mu.lock();
        OutputStream bw = this.bw;
        TcpConnection conn = this.conn;
        BlockingQueue<Boolean> fch = this.fch;
        this.mu.unlock();
        if (conn == null || bw == null) {
            return;
        }
        this.logger.trace("entering flusher loop...");
        while (fch.take().booleanValue()) {
            this.mu.lock();
            try {
                if (!this._isConnected() || this.isConnecting() || bw != this.bw || conn != this.conn) {
                    return;
                }
                bw.flush();
                this.stats.incrementFlushes();
            }
            catch (IOException e) {
                this.logger.debug("I/O exception encountered during flush");
                this.setLastError(e);
            }
            finally {
                this.mu.unlock();
            }
            this.sleepInterval(this.flushTimerInterval, this.flushTimerUnit);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush(int timeout) throws Exception {
        Exception err = null;
        if (timeout <= 0) {
            throw new IllegalArgumentException("nats: timeout invalid");
        }
        BlockingQueue<Boolean> ch = null;
        this.mu.lock();
        try {
            if (this._isClosed()) {
                throw new IllegalStateException("nats: connection closed");
            }
            ch = this.createBooleanChannel(1);
            this.sendPing(ch);
        }
        finally {
            this.mu.unlock();
        }
        Boolean rv = null;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                rv = ch.poll(timeout, TimeUnit.MILLISECONDS);
                if (rv == null) {
                    err = new TimeoutException("nats: timeout");
                    break;
                }
                if (rv.booleanValue()) {
                    ch.clear();
                    break;
                }
                err = new IllegalStateException("nats: connection closed");
                break;
            }
            catch (InterruptedException e) {
                this.logger.debug("flush was interrupted while waiting for PONG", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
        if (err != null) {
            this.removeFlushEntry(ch);
            throw err;
        }
    }

    @Override
    public void flush() throws Exception {
        this.flush(60000);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void resendSubscriptions() {
        long adjustedMax = 0L;
        for (Long key : this.subs.keySet()) {
            SubscriptionImpl sub;
            block11: {
                sub = this.subs.get(key);
                if (sub instanceof AsyncSubscription) {
                    ((AsyncSubscriptionImpl)sub).start();
                }
                this.logger.trace("Resending subscriptions:");
                sub.lock();
                try {
                    this.logger.trace("Sub = {}", (Object)sub);
                    if (sub.max <= 0L) break block11;
                    if (sub.delivered < sub.max) {
                        adjustedMax = sub.max - sub.delivered;
                    }
                    if (adjustedMax == 0L) {
                        try {
                            this.unsubscribe(sub, 0);
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                        continue;
                    }
                }
                finally {
                    sub.unlock();
                    continue;
                }
            }
            this.sendSubscriptionMessage(sub);
            if (adjustedMax <= 0L) continue;
            try {
                this.writeUnsubProto(sub, adjustedMax);
            }
            catch (Exception e) {
                this.logger.debug("nats: exception while writing UNSUB proto");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SubscriptionImpl subscribe(String subject, String queue, MessageHandler cb, BlockingQueue<Message> ch) {
        this.mu.lock();
        try {
            SubscriptionImpl sub;
            if (this._isClosed()) {
                throw new IllegalStateException("nats: connection closed");
            }
            if (cb == null && ch == null) {
                throw new IllegalArgumentException("nats: invalid subscription");
            }
            if (cb != null) {
                sub = new AsyncSubscriptionImpl(this, subject, queue, cb);
                this.logger.debug("Starting subscription for subject '{}'", (Object)subject);
                this.subexec.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            ConnectionImpl.this.waitForMsgs((AsyncSubscriptionImpl)sub);
                        }
                        catch (InterruptedException e) {
                            ConnectionImpl.this.logger.debug("Interrupted in waitForMsgs");
                            Thread.currentThread().interrupt();
                        }
                    }
                });
            } else {
                sub = new SyncSubscriptionImpl(this, subject, queue);
                sub.setChannel(ch);
            }
            this.addSubscription(sub);
            if (!this._isReconnecting()) {
                this.sendSubscriptionMessage(sub);
            }
            this.kickFlusher();
            SubscriptionImpl subscriptionImpl = sub;
            return subscriptionImpl;
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public SyncSubscription subscribe(String subject) {
        return this.subscribeSync(subject, null);
    }

    @Override
    public SyncSubscription subscribe(String subject, String queue) {
        return this.subscribeSync(subject, queue);
    }

    @Override
    public AsyncSubscription subscribe(String subject, MessageHandler cb) {
        return (AsyncSubscriptionImpl)this.subscribe(subject, null, cb);
    }

    @Override
    public AsyncSubscription subscribe(String subj, String queue, MessageHandler cb) {
        return (AsyncSubscriptionImpl)this.subscribe(subj, queue, cb, null);
    }

    @Override
    @Deprecated
    public AsyncSubscription subscribeAsync(String subject, String queue, MessageHandler cb) {
        return (AsyncSubscriptionImpl)this.subscribe(subject, queue, cb, null);
    }

    @Override
    @Deprecated
    public AsyncSubscription subscribeAsync(String subj, MessageHandler cb) {
        return (AsyncSubscriptionImpl)this.subscribe(subj, null, cb);
    }

    private void addSubscription(SubscriptionImpl sub) {
        sub.setSid(this.sidCounter.incrementAndGet());
        this.subs.put(sub.getSid(), sub);
        this.logger.trace("Successfully added subscription to {} [{}]", (Object)sub.getSubject(), (Object)sub.getSid());
    }

    @Override
    public SyncSubscription subscribeSync(String subject, String queue) {
        return (SyncSubscription)((Object)this.subscribe(subject, queue, null, this.createMsgChannel()));
    }

    @Override
    public SyncSubscription subscribeSync(String subject) {
        return (SyncSubscription)((Object)this.subscribe(subject, null, null, this.createMsgChannel()));
    }

    void writePublishProto(ByteBuffer buffer, byte[] subject, byte[] reply, int msgSize) {
        this.pubProtoBuf.put(subject, 0, subject.length);
        if (reply != null) {
            this.pubProtoBuf.put((byte)32);
            this.pubProtoBuf.put(reply, 0, reply.length);
        }
        this.pubProtoBuf.put((byte)32);
        byte[] bytes = new byte[12];
        int idx = bytes.length;
        if (msgSize > 0) {
            for (int l = msgSize; l > 0; l /= 10) {
                bytes[--idx] = digits[l % 10];
            }
        } else {
            bytes[--idx] = digits[0];
        }
        this.pubProtoBuf.put(bytes, idx, bytes.length - idx);
        this.pubProtoBuf.put(this.crlfProtoBytes, 0, this.crlfProtoBytesLen);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void publish(byte[] subject, byte[] reply, byte[] data, boolean forceFlush) throws IOException {
        int msgSize = data != null ? data.length : 0;
        this.mu.lock();
        try {
            if ((long)msgSize > this.info.getMaxPayload()) {
                throw new IllegalArgumentException("nats: maximum payload exceeded");
            }
            if (this._isClosed()) {
                throw new IllegalStateException("nats: connection closed");
            }
            if (this._isReconnecting()) {
                try {
                    this.bw.flush();
                }
                catch (IOException e) {
                    this.logger.error("I/O exception during flush", (Throwable)e);
                }
                if (this.pending.size() >= this.opts.getReconnectBufSize()) {
                    throw new IOException("nats: outbound buffer limit exceeded");
                }
            }
            try {
                this.writePublishProto(this.pubProtoBuf, subject, reply, msgSize);
            }
            catch (BufferOverflowException e) {
                this.logger.warn("nats: reallocating publish buffer due to overflow");
                int resizeAmount = 1024 + subject.length + (reply != null ? reply.length : 0);
                this.buildPublishProtocolBuffer(resizeAmount);
                this.writePublishProto(this.pubProtoBuf, subject, reply, msgSize);
            }
            try {
                this.bw.write(this.pubProtoBuf.array(), 0, this.pubProtoBuf.position());
                this.pubProtoBuf.position(this.pubPrimBytesLen);
                if (msgSize > 0) {
                    this.bw.write(data, 0, msgSize);
                }
                this.bw.write(this.crlfProtoBytes, 0, this.crlfProtoBytesLen);
            }
            catch (IOException e) {
                this.setLastError(e);
                this.mu.unlock();
                return;
            }
            this.stats.incrementOutMsgs();
            this.stats.incrementOutBytes(msgSize);
            if (forceFlush) {
                this.bw.flush();
                this.stats.incrementFlushes();
            } else if (this.fch.isEmpty()) {
                this.kickFlusher();
            }
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public void publish(String subject, String reply, byte[] data, boolean flush) throws IOException {
        if (subject == null) {
            throw new NullPointerException("nats: invalid subject");
        }
        if (subject.isEmpty()) {
            throw new IllegalArgumentException("nats: invalid subject");
        }
        byte[] subjBytes = subject.getBytes();
        byte[] replyBytes = null;
        if (reply != null) {
            replyBytes = reply.getBytes();
        }
        this.publish(subjBytes, replyBytes, data, flush);
    }

    @Override
    public void publish(String subject, String reply, byte[] data) throws IOException {
        this.publish(subject, reply, data, false);
    }

    @Override
    public void publish(String subject, byte[] data) throws IOException {
        this.publish(subject, null, data);
    }

    @Override
    public void publish(Message msg) throws IOException {
        this.publish(msg.getSubjectBytes(), msg.getReplyToBytes(), msg.getData(), false);
    }

    @Override
    public Message request(String subject, byte[] data, long timeout, TimeUnit unit) throws TimeoutException, IOException {
        String inbox = this.newInbox();
        BlockingQueue<Message> ch = this.createMsgChannel(8);
        Message msg = null;
        if (Thread.currentThread().isInterrupted()) {
            Thread.interrupted();
        }
        try (SyncSubscription sub = (SyncSubscription)((Object)this.subscribe(inbox, null, null, ch));){
            sub.autoUnsubscribe(1);
            this.publish(subject, inbox, data);
            try {
                msg = sub.nextMessage(timeout, unit);
            }
            catch (InterruptedException e) {
                this.logger.debug("request() interrupted (and cleared)", (Throwable)e);
                Thread.interrupted();
            }
            Message message = msg;
            return message;
        }
    }

    @Override
    public Message request(String subject, byte[] data, long timeout) throws TimeoutException, IOException {
        return this.request(subject, data, timeout, TimeUnit.MILLISECONDS);
    }

    @Override
    public Message request(String subject, byte[] data) throws IOException, TimeoutException {
        return this.request(subject, data, -1L, TimeUnit.MILLISECONDS);
    }

    @Override
    public String newInbox() {
        String inbox = String.format("%s%s", inboxPrefix, NUID.nextGlobal());
        return inbox;
    }

    @Override
    public synchronized Statistics getStats() {
        return new Statistics(this.stats);
    }

    @Override
    public synchronized void resetStats() {
        this.stats.clear();
    }

    @Override
    public synchronized long getMaxPayload() {
        return this.info.getMaxPayload();
    }

    protected void sendSubscriptionMessage(SubscriptionImpl sub) {
        String queue = sub.getQueue();
        String subLine = String.format(SUB_PROTO, sub.getSubject(), queue != null && !queue.isEmpty() ? _SPC_ + queue : _EMPTY_, sub.getSid());
        try {
            this.bw.write(subLine.getBytes());
        }
        catch (IOException e) {
            this.logger.warn("nats: I/O exception while sending subscription message");
        }
    }

    @Override
    public ClosedCallback getClosedCallback() {
        return this.opts.getClosedCallback();
    }

    @Override
    public void setClosedCallback(ClosedCallback cb) {
        this.mu.lock();
        try {
            this.opts.setClosedCallback(cb);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public DisconnectedCallback getDisconnectedCallback() {
        return this.opts.getDisconnectedCallback();
    }

    @Override
    public void setDisconnectedCallback(DisconnectedCallback cb) {
        this.mu.lock();
        try {
            this.opts.setDisconnectedCallback(cb);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public ReconnectedCallback getReconnectedCallback() {
        return this.opts.getReconnectedCallback();
    }

    @Override
    public void setReconnectedCallback(ReconnectedCallback cb) {
        this.mu.lock();
        try {
            this.opts.setReconnectedCallback(cb);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public ExceptionHandler getExceptionHandler() {
        return this.opts.getExceptionHandler();
    }

    @Override
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.mu.lock();
        try {
            this.opts.setExceptionHandler(exceptionHandler);
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public String getConnectedUrl() {
        this.mu.lock();
        try {
            if (this.status != Constants.ConnState.CONNECTED) {
                String string = null;
                return string;
            }
            String string = this.getUrl().toString();
            return string;
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public String getConnectedServerId() {
        this.mu.lock();
        try {
            if (this.status != Constants.ConnState.CONNECTED) {
                String string = null;
                return string;
            }
            String string = this.info.getId();
            return string;
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public Constants.ConnState getState() {
        this.mu.lock();
        try {
            Constants.ConnState connState = this.status;
            return connState;
        }
        finally {
            this.mu.unlock();
        }
    }

    @Override
    public ServerInfo getConnectedServerInfo() {
        return this.info;
    }

    void setConnectedServerInfo(ServerInfo info) {
        this.info = info;
    }

    void setConnectedServerInfo(String info) {
        this.processInfo(info);
    }

    @Override
    public Exception getLastException() {
        return this.lastEx;
    }

    void setLastError(Exception err) {
        this.lastEx = err;
    }

    protected Options getOptions() {
        return this.opts;
    }

    void setPending(ByteArrayOutputStream pending) {
        this.pending = pending;
    }

    ByteArrayOutputStream getPending() {
        return this.pending;
    }

    protected void sleepInterval(long timeout, TimeUnit unit) throws InterruptedException {
        unit.sleep(timeout);
    }

    void setOutputStream(OutputStream out) {
        this.mu.lock();
        try {
            this.bw = out;
        }
        finally {
            this.mu.unlock();
        }
    }

    OutputStream getOutputStream() {
        return this.bw;
    }

    void setInputStream(InputStream in) {
        this.mu.lock();
        try {
            this.br = in;
        }
        finally {
            this.mu.unlock();
        }
    }

    InputStream getInputStream() {
        return this.br;
    }

    protected ArrayList<BlockingQueue<Boolean>> getPongs() {
        return this.pongs;
    }

    protected void setPongs(ArrayList<BlockingQueue<Boolean>> pongs) {
        this.pongs = pongs;
    }

    protected Map<Long, SubscriptionImpl> getSubs() {
        return this.subs;
    }

    protected void setSubs(Map<Long, SubscriptionImpl> subs) {
        this.subs = subs;
    }

    protected List<Srv> getServerPool() {
        return this.srvPool;
    }

    protected void setServerPool(List<Srv> pool) {
        this.srvPool = pool;
    }

    @Override
    public int getPendingByteCount() {
        int rv = 0;
        if (this.getPending() != null) {
            rv = this.getPending().size();
        }
        return rv;
    }

    protected void setFlushChannel(BlockingQueue<Boolean> fch) {
        this.fch = fch;
    }

    protected BlockingQueue<Boolean> getFlushChannel() {
        return this.fch;
    }

    protected void setTcpConnection(TcpConnection conn) {
        this.conn = conn;
    }

    protected TcpConnection getTcpConnection() {
        return this.conn;
    }

    protected void setTcpConnectionFactory(TcpConnectionFactory factory) {
        this.tcf = factory;
    }

    protected TcpConnectionFactory getTcpConnectionFactory() {
        return this.tcf;
    }

    URI getUrl() {
        return this.url;
    }

    void setUrl(URI url) {
        this.url = url;
    }

    int getActualPingsOutstanding() {
        return this.pout;
    }

    void setActualPingsOutstanding(int pout) {
        this.pout = pout;
    }

    ScheduledFuture<?> getPingTimer() {
        return this.ptmr;
    }

    void setPingTimer(ScheduledFuture<?> ptmr) {
        this.ptmr = ptmr;
    }

    class FlushTimerTask
    extends TimerTask {
        FlushTimerTask() {
        }

        @Override
        public void run() {
            ConnectionImpl.this.flushSocket();
        }
    }

    class PingTimerTask
    extends TimerTask {
        PingTimerTask() {
        }

        @Override
        public void run() {
            ConnectionImpl.this.mu.lock();
            if (ConnectionImpl.this.status != Constants.ConnState.CONNECTED) {
                ConnectionImpl.this.mu.unlock();
                return;
            }
            ConnectionImpl.this.setActualPingsOutstanding(ConnectionImpl.this.getActualPingsOutstanding() + 1);
            if (ConnectionImpl.this.getActualPingsOutstanding() > ConnectionImpl.this.opts.getMaxPingsOut()) {
                ConnectionImpl.this.mu.unlock();
                ConnectionImpl.this.processOpError(new IOException("nats: stale connection"));
                return;
            }
            ConnectionImpl.this.sendPing(null);
            ConnectionImpl.this.mu.unlock();
        }
    }

    class Srv {
        URI url = null;
        int reconnects = 0;
        long lastAttempt = 0L;
        long lastAttemptNanos = 0L;
        boolean secure = false;

        protected Srv(URI url) {
            this.url = url;
            if (url.getScheme().equals("tls")) {
                this.secure = true;
            }
        }

        void updateLastAttempt() {
            this.lastAttemptNanos = System.nanoTime();
            this.lastAttempt = System.currentTimeMillis();
        }

        long timeSinceLastAttempt() {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.lastAttemptNanos);
        }

        public String toString() {
            SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy hh:mm:ss");
            String dateToStr = format.format(new Date(this.lastAttempt));
            return String.format("{url=%s, reconnects=%d, lastAttempt=%s, timeSinceLastAttempt=%dms}", this.url.toString(), this.reconnects, dateToStr, this.timeSinceLastAttempt());
        }
    }

    class ConnectInfo {
        @SerializedName(value="verbose")
        private Boolean verbose;
        @SerializedName(value="pedantic")
        private Boolean pedantic;
        @SerializedName(value="user")
        private String user;
        @SerializedName(value="pass")
        private String pass;
        @SerializedName(value="auth_token")
        private String token;
        @SerializedName(value="tls_required")
        private Boolean tlsRequired;
        @SerializedName(value="name")
        private String name;
        @SerializedName(value="lang")
        private String lang = "java";
        @SerializedName(value="version")
        private String version;
        @SerializedName(value="protocol")
        private int protocol;
        private transient Gson gson;

        public ConnectInfo(boolean verbose, boolean pedantic, String username, String password, String token, boolean secure, String connectionName, String lang, String version, ClientProto proto) {
            this.version = ConnectionImpl.this.version;
            this.gson = new GsonBuilder().create();
            this.verbose = new Boolean(verbose);
            this.pedantic = new Boolean(pedantic);
            this.user = username;
            this.pass = password;
            this.token = token;
            this.tlsRequired = new Boolean(secure);
            this.name = connectionName;
            this.lang = lang;
            this.version = version;
            this.protocol = proto.getValue();
        }

        public String toString() {
            return this.gson.toJson((Object)this);
        }
    }

    protected class Control {
        String op = null;
        String args = null;

        protected Control(String s) {
            if (s == null) {
                return;
            }
            String[] parts = s.split(ConnectionImpl._SPC_, 2);
            switch (parts.length) {
                case 1: {
                    this.op = parts[0].trim();
                    break;
                }
                case 2: {
                    this.op = parts[0].trim();
                    this.args = parts[1].trim();
                    if (!this.args.isEmpty()) break;
                    this.args = null;
                    break;
                }
            }
        }

        public String toString() {
            return "{op=" + this.op + ", args=" + this.args + "}";
        }
    }

    protected static enum ClientProto {
        CLIENT_PROTO_ZERO(0),
        CLIENT_PROTO_INFO(1);

        private final int value;

        private ClientProto(int value) {
            this.value = value;
        }

        public int getValue() {
            return this.value;
        }
    }
}

