/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.Consumer;
import io.nats.client.Dispatcher;
import io.nats.client.ErrorListener;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.NUID;
import io.nats.client.Options;
import io.nats.client.Statistics;
import io.nats.client.Subscription;
import io.nats.client.impl.DataPort;
import io.nats.client.impl.MessageQueue;
import io.nats.client.impl.NatsConnectionReader;
import io.nats.client.impl.NatsConnectionWriter;
import io.nats.client.impl.NatsConsumer;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.NatsServerInfo;
import io.nats.client.impl.NatsStatistics;
import io.nats.client.impl.NatsSubscription;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

class NatsConnection
implements Connection {
    static final byte[] EMPTY_BODY = new byte[0];
    static final String INBOX_PREFIX = "_INBOX.";
    static final byte CR = 13;
    static final byte LF = 10;
    static final byte[] CRLF = new byte[]{13, 10};
    static final String OP_CONNECT = "CONNECT";
    static final String OP_INFO = "INFO";
    static final String OP_SUB = "SUB";
    static final String OP_PUB = "PUB";
    static final String OP_UNSUB = "UNSUB";
    static final String OP_MSG = "MSG";
    static final String OP_PING = "PING";
    static final String OP_PONG = "PONG";
    static final String OP_OK = "+OK";
    static final String OP_ERR = "-ERR";
    private Options options;
    private NatsStatistics statistics;
    private boolean connecting;
    private boolean disconnecting;
    private boolean closing;
    private Exception exceptionDuringConnectChange;
    private Connection.Status status;
    private ReentrantLock statusLock;
    private Condition statusChanged;
    private CompletableFuture<DataPort> dataPortFuture;
    private DataPort dataPort;
    private String currentServerURI;
    private CompletableFuture<Boolean> reconnectWaiter;
    private NatsConnectionReader reader;
    private NatsConnectionWriter writer;
    private AtomicReference<NatsServerInfo> serverInfo;
    private Map<String, NatsSubscription> subscribers;
    private Map<String, NatsDispatcher> dispatchers;
    private Map<String, CompletableFuture<Message>> responses;
    private ConcurrentLinkedDeque<CompletableFuture<Boolean>> pongQueue;
    private String mainInbox;
    private AtomicReference<NatsDispatcher> inboxDispatcher;
    private Timer timer;
    private AtomicLong nextSid;
    private NUID nuid;
    private AtomicReference<String> lastError;
    private AtomicReference<CompletableFuture<Boolean>> draining;
    private AtomicBoolean blockPublishForDrain;
    private ExecutorService callbackRunner;
    static final int RESP_INBOX_PREFIX_LEN = "_INBOX.".length() + 22 + 1;

    NatsConnection(Options options) {
        this.options = options;
        this.statistics = new NatsStatistics(this.options.isTrackAdvancedStats());
        this.statusLock = new ReentrantLock();
        this.statusChanged = this.statusLock.newCondition();
        this.status = Connection.Status.DISCONNECTED;
        this.reconnectWaiter = new CompletableFuture();
        this.reconnectWaiter.complete(Boolean.TRUE);
        this.dispatchers = new ConcurrentHashMap<String, NatsDispatcher>();
        this.subscribers = new ConcurrentHashMap<String, NatsSubscription>();
        this.responses = new ConcurrentHashMap<String, CompletableFuture<Message>>();
        this.nextSid = new AtomicLong(1L);
        this.nuid = new NUID();
        this.mainInbox = this.createInbox() + ".*";
        this.lastError = new AtomicReference();
        this.serverInfo = new AtomicReference();
        this.inboxDispatcher = new AtomicReference();
        this.pongQueue = new ConcurrentLinkedDeque();
        this.draining = new AtomicReference();
        this.blockPublishForDrain = new AtomicBoolean();
        this.reader = new NatsConnectionReader(this);
        this.writer = new NatsConnectionWriter(this);
        this.callbackRunner = Executors.newSingleThreadExecutor();
    }

    void connect(boolean reconnectOnConnect) throws InterruptedException, IOException {
        if (this.options.getServers().size() == 0) {
            throw new IllegalArgumentException("No servers provided in options");
        }
        for (String serverURI : this.getServers()) {
            if (this.isClosed()) break;
            this.updateStatus(Connection.Status.CONNECTING);
            this.tryToConnect(serverURI);
            if (this.isConnected()) break;
            this.updateStatus(Connection.Status.DISCONNECTED);
        }
        if (!this.isConnected() && !this.isClosed()) {
            if (reconnectOnConnect) {
                this.reconnect();
            } else {
                this.close();
                throw new IOException("Unable to connect to gnatsd server.");
            }
        }
    }

    void reconnect() throws InterruptedException {
        long maxTries = this.options.getMaxReconnect();
        long tries = 0L;
        String lastServer = null;
        if (this.isClosed()) {
            return;
        }
        if (maxTries == 0L) {
            this.close();
            return;
        }
        while (!(this.isConnected() || this.isClosed() || this.isClosing())) {
            Collection<String> serversToTry = this.buildReconnectList();
            for (String server : serversToTry) {
                if (this.isClosed()) break;
                if (server.equals(lastServer)) {
                    this.reconnectWaiter = new CompletableFuture();
                    this.waitForReconnectTimeout();
                }
                if (this.isDisconnectingOrClosed() || this.isClosing()) break;
                this.updateStatus(Connection.Status.RECONNECTING);
                this.tryToConnect(server);
                lastServer = server;
                if (maxTries > 0L && ++tries >= maxTries) break;
                if (!this.isConnected()) continue;
                this.statistics.incrementReconnects();
                break;
            }
            if (maxTries <= 0L || tries < maxTries) continue;
            break;
        }
        if (!this.isConnected()) {
            this.close();
            return;
        }
        this.subscribers.forEach((sid, sub) -> {
            if (!sub.isDraining()) {
                this.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName());
            }
        });
        this.dispatchers.forEach((nuid, d) -> {
            if (!d.isDraining()) {
                d.resendSubscriptions();
            }
        });
        try {
            this.flush(this.options.getConnectionTimeout());
        }
        catch (Exception exp) {
            this.processException(exp);
        }
        this.processConnectionEvent(ConnectionListener.Events.RESUBSCRIBED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void tryToConnect(String serverURI) {
        try {
            this.statusLock.lock();
            try {
                if (this.connecting) {
                    return;
                }
                this.connecting = true;
                this.statusChanged.signalAll();
            }
            finally {
                this.statusLock.unlock();
            }
            Duration connectTimeout = this.options.getConnectionTimeout();
            this.dataPortFuture = new CompletableFuture();
            this.reader.stop().get();
            this.writer.stop().get();
            this.cleanUpPongQueue();
            DataPort newDataPort = this.options.buildDataPort();
            newDataPort.connect(serverURI, this);
            this.dataPort = newDataPort;
            this.dataPortFuture.complete(this.dataPort);
            this.readInitialInfo();
            this.checkVersionRequirements();
            this.upgradeToSecureIfNeeded();
            this.reader.start(this.dataPortFuture);
            this.writer.start(this.dataPortFuture);
            this.sendConnect(serverURI);
            CompletableFuture<Boolean> pongFuture = this.sendPing();
            if (pongFuture != null) {
                pongFuture.get(connectTimeout.toNanos(), TimeUnit.NANOSECONDS);
            }
            if (this.timer == null) {
                long cleanMillis;
                this.timer = new Timer("Nats Connection Timer");
                long pingMillis = this.options.getPingInterval().toMillis();
                if (pingMillis > 0L) {
                    this.timer.schedule(new TimerTask(){

                        @Override
                        public void run() {
                            if (NatsConnection.this.isConnected()) {
                                NatsConnection.this.sendPing();
                            }
                        }
                    }, pingMillis, pingMillis);
                }
                if ((cleanMillis = this.options.getRequestCleanupInterval().toMillis()) > 0L) {
                    this.timer.schedule(new TimerTask(){

                        @Override
                        public void run() {
                            NatsConnection.this.cleanResponses(false);
                        }
                    }, cleanMillis, cleanMillis);
                }
            }
            this.statusLock.lock();
            try {
                this.connecting = false;
                if (this.exceptionDuringConnectChange != null) {
                    throw this.exceptionDuringConnectChange;
                }
                this.currentServerURI = serverURI;
                this.updateStatus(Connection.Status.CONNECTED);
            }
            finally {
                this.statusLock.unlock();
            }
        }
        catch (RuntimeException exp) {
            this.processException(exp);
            throw exp;
        }
        catch (Exception exp) {
            this.processException(exp);
            try {
                this.closeSocket(false);
            }
            catch (InterruptedException e) {
                this.processException(e);
            }
        }
        finally {
            this.statusLock.lock();
            try {
                this.connecting = false;
                this.statusChanged.signalAll();
            }
            finally {
                this.statusLock.unlock();
            }
        }
    }

    void checkVersionRequirements() throws IOException {
        Options opts = this.getOptions();
        NatsServerInfo info = this.getInfo();
        if (opts.isNoEcho() && info.getProtocolVersion() < 1) {
            throw new IOException("Server does not support no echo.");
        }
    }

    void upgradeToSecureIfNeeded() throws IOException {
        Options opts = this.getOptions();
        NatsServerInfo info = this.getInfo();
        if (opts.isTLSRequired() && !info.isTLSRequired()) {
            throw new IOException("SSL connection wanted by client.");
        }
        if (!opts.isTLSRequired() && info.isTLSRequired()) {
            throw new IOException("SSL required by server.");
        }
        if (opts.isTLSRequired()) {
            this.dataPort.upgradeToSecure();
        }
    }

    void handleCommunicationIssue(Exception io) {
        this.statusLock.lock();
        try {
            if (this.connecting || this.disconnecting || this.status == Connection.Status.CLOSED) {
                this.exceptionDuringConnectChange = io;
                return;
            }
        }
        finally {
            this.statusLock.unlock();
        }
        this.processException(io);
        String name = this.getOptions().getConnectionName() != null ? this.getOptions().getConnectionName() : "Nats Connection";
        Thread t = new Thread(() -> {
            try {
                this.closeSocket(true);
            }
            catch (InterruptedException e) {
                this.processException(e);
            }
        }, name + " Reconnect");
        t.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closeSocket(boolean tryReconnectIfConnected) throws InterruptedException {
        boolean wasConnected = false;
        this.statusLock.lock();
        try {
            if (this.isDisconnectingOrClosed()) {
                this.waitForDisconnectOrClose(this.options.getConnectionTimeout());
                return;
            }
            this.disconnecting = true;
            this.exceptionDuringConnectChange = null;
            wasConnected = this.status == Connection.Status.CONNECTED;
            this.statusChanged.signalAll();
        }
        finally {
            this.statusLock.unlock();
        }
        this.closeSocketImpl();
        this.statusLock.lock();
        try {
            this.updateStatus(Connection.Status.DISCONNECTED);
            this.disconnecting = false;
            this.statusChanged.signalAll();
        }
        finally {
            this.statusLock.unlock();
        }
        if (this.isClosing()) {
            this.close();
        } else if (wasConnected && tryReconnectIfConnected) {
            this.reconnect();
        }
    }

    @Override
    public void close() throws InterruptedException {
        this.close(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close(boolean checkDrainStatus) throws InterruptedException {
        this.statusLock.lock();
        try {
            if (checkDrainStatus && this.isDraining()) {
                this.waitForDisconnectOrClose(this.options.getConnectionTimeout());
                return;
            }
            this.closing = true;
            if (this.isDisconnectingOrClosed()) {
                this.waitForDisconnectOrClose(this.options.getConnectionTimeout());
                return;
            }
            this.disconnecting = true;
            this.exceptionDuringConnectChange = null;
            this.statusChanged.signalAll();
        }
        finally {
            this.statusLock.unlock();
        }
        if (this.reconnectWaiter != null) {
            this.reconnectWaiter.cancel(true);
        }
        this.closeSocketImpl();
        this.dispatchers.forEach((nuid, d) -> d.stop(false));
        this.subscribers.forEach((sid, sub) -> sub.invalidate());
        this.dispatchers.clear();
        this.subscribers.clear();
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
        this.cleanResponses(true);
        this.cleanUpPongQueue();
        this.statusLock.lock();
        try {
            this.updateStatus(Connection.Status.CLOSED);
            if (this.exceptionDuringConnectChange != null) {
                this.processException(this.exceptionDuringConnectChange);
                this.exceptionDuringConnectChange = null;
            }
        }
        finally {
            this.statusLock.unlock();
        }
        this.callbackRunner.shutdown();
        try {
            this.callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
        }
        finally {
            this.callbackRunner.shutdownNow();
        }
        this.statusLock.lock();
        try {
            this.disconnecting = false;
            this.statusChanged.signalAll();
        }
        finally {
            this.statusLock.unlock();
        }
    }

    void closeSocketImpl() {
        this.currentServerURI = null;
        this.reader.stop();
        this.writer.stop();
        this.dataPortFuture.cancel(true);
        try {
            if (this.dataPort != null) {
                this.dataPort.close();
            }
        }
        catch (IOException ex) {
            this.processException(ex);
        }
        this.cleanUpPongQueue();
        try {
            this.reader.stop().get(10L, TimeUnit.SECONDS);
        }
        catch (Exception ex) {
            this.processException(ex);
        }
        try {
            this.writer.stop().get(10L, TimeUnit.SECONDS);
        }
        catch (Exception ex) {
            this.processException(ex);
        }
    }

    void cleanUpPongQueue() {
        Future b;
        while ((b = (Future)this.pongQueue.poll()) != null) {
            try {
                b.cancel(true);
            }
            catch (CancellationException e) {
                this.processException(e);
            }
        }
    }

    @Override
    public void publish(String subject, byte[] body) {
        this.publish(subject, null, body);
    }

    @Override
    public void publish(String subject, String replyTo, byte[] body) {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (this.blockPublishForDrain.get()) {
            throw new IllegalStateException("Connection is Draining");
        }
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in publish");
        }
        if (replyTo != null && replyTo.length() == 0) {
            throw new IllegalArgumentException("ReplyTo cannot be the empty string");
        }
        if (body == null) {
            body = EMPTY_BODY;
        } else if ((long)body.length > this.getMaxPayload() && this.getMaxPayload() > 0L) {
            throw new IllegalArgumentException("Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload());
        }
        NatsMessage msg = new NatsMessage(subject, replyTo, body);
        if (!(this.status != Connection.Status.RECONNECTING && this.status != Connection.Status.DISCONNECTED || this.writer.canQueue(msg, this.options.getReconnectBufferSize()))) {
            throw new IllegalStateException("Unable to queue any more messages during reconnect, max buffer is " + this.getMaxPayload());
        }
        this.queueOutgoing(msg);
    }

    @Override
    public Subscription subscribe(String subject) {
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in subscribe");
        }
        return this.createSubscription(subject, null, null);
    }

    @Override
    public Subscription subscribe(String subject, String queueName) {
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in subscribe");
        }
        if (queueName == null || queueName.length() == 0) {
            throw new IllegalArgumentException("QueueName is required in subscribe");
        }
        return this.createSubscription(subject, queueName, null);
    }

    void invalidate(NatsSubscription sub) {
        String sid = sub.getSID();
        this.subscribers.remove(sid);
        if (sub.getNatsDispatcher() != null) {
            sub.getNatsDispatcher().remove(sub);
        }
        sub.invalidate();
    }

    void unsubscribe(NatsSubscription sub, int after) {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (after <= 0) {
            this.invalidate(sub);
        } else {
            sub.setUnsubLimit(after);
            if (sub.reachedUnsubLimit()) {
                sub.invalidate();
            }
        }
        if (!this.isConnected()) {
            return;
        }
        this.sendUnsub(sub, after);
    }

    void sendUnsub(NatsSubscription sub, int after) {
        String sid = sub.getSID();
        StringBuilder protocolBuilder = new StringBuilder();
        protocolBuilder.append(OP_UNSUB);
        protocolBuilder.append(" ");
        protocolBuilder.append(sid);
        if (after > 0) {
            protocolBuilder.append(" ");
            protocolBuilder.append(String.valueOf(after));
        }
        NatsMessage unsubMsg = new NatsMessage(protocolBuilder.toString());
        this.queueOutgoing(unsubMsg);
    }

    NatsSubscription createSubscription(String subject, String queueName, NatsDispatcher dispatcher) {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (this.isDraining() && (dispatcher == null || dispatcher != this.inboxDispatcher.get())) {
            throw new IllegalStateException("Connection is Draining");
        }
        NatsSubscription sub = null;
        long sidL = this.nextSid.getAndIncrement();
        String sid = String.valueOf(sidL);
        sub = new NatsSubscription(sid, subject, queueName, this, dispatcher);
        this.subscribers.put(sid, sub);
        this.sendSubscriptionMessage(sid, subject, queueName);
        return sub;
    }

    void sendSubscriptionMessage(CharSequence sid, String subject, String queueName) {
        if (!this.isConnected()) {
            return;
        }
        StringBuilder protocolBuilder = new StringBuilder();
        protocolBuilder.append(OP_SUB);
        protocolBuilder.append(" ");
        protocolBuilder.append(subject);
        if (queueName != null) {
            protocolBuilder.append(" ");
            protocolBuilder.append(queueName);
        }
        protocolBuilder.append(" ");
        protocolBuilder.append(sid);
        NatsMessage subMsg = new NatsMessage(protocolBuilder.toString());
        this.queueOutgoing(subMsg);
    }

    String createInbox() {
        StringBuilder builder = new StringBuilder();
        builder.append(INBOX_PREFIX);
        builder.append(this.nuid.next());
        return builder.toString();
    }

    String createResponseInbox(String inbox) {
        StringBuilder builder = new StringBuilder();
        builder.append(inbox.substring(0, RESP_INBOX_PREFIX_LEN));
        builder.append(this.nuid.next());
        return builder.toString();
    }

    String getResponseToken(String responseInbox) {
        if (responseInbox.length() <= RESP_INBOX_PREFIX_LEN) {
            return responseInbox;
        }
        return responseInbox.substring(RESP_INBOX_PREFIX_LEN);
    }

    void cleanResponses(boolean cancelIfRunning) {
        ArrayList toRemove = new ArrayList();
        this.responses.forEach((token, f) -> {
            if (f.isDone() || cancelIfRunning) {
                try {
                    f.cancel(true);
                }
                catch (CancellationException cancellationException) {
                    // empty catch block
                }
                toRemove.add(token);
                this.statistics.decrementOutstandingRequests();
            }
        });
        for (String token2 : toRemove) {
            this.responses.remove(token2);
        }
    }

    @Override
    public Message request(String subject, byte[] body, Duration timeout) throws InterruptedException {
        Message reply = null;
        CompletableFuture<Message> incoming = this.request(subject, body);
        try {
            reply = (Message)incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
        }
        catch (ExecutionException | TimeoutException e) {
            reply = null;
        }
        return reply;
    }

    @Override
    public CompletableFuture<Message> request(String subject, byte[] body) {
        NatsDispatcher d;
        String responseInbox = null;
        boolean oldStyle = this.options.isOldRequestStyle();
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (this.isDraining()) {
            throw new IllegalStateException("Connection is Draining");
        }
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in publish");
        }
        if (body == null) {
            body = EMPTY_BODY;
        } else if ((long)body.length > this.getMaxPayload() && this.getMaxPayload() > 0L) {
            throw new IllegalArgumentException("Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload());
        }
        if (this.inboxDispatcher.get() == null && this.inboxDispatcher.compareAndSet(null, d = new NatsDispatcher(this, msg -> this.deliverReply(msg)))) {
            String id = this.nuid.next();
            this.dispatchers.put(id, d);
            d.start(id);
            d.subscribe(this.mainInbox);
        }
        responseInbox = oldStyle ? this.createInbox() : this.createResponseInbox(this.mainInbox);
        String responseToken = this.getResponseToken(responseInbox);
        CompletableFuture<Message> future = new CompletableFuture<Message>();
        this.responses.put(responseToken, future);
        this.statistics.incrementOutstandingRequests();
        if (oldStyle) {
            this.inboxDispatcher.get().subscribe(responseInbox).unsubscribe(responseInbox, 1);
        }
        this.publish(subject, responseInbox, body);
        this.statistics.incrementRequestsSent();
        return future;
    }

    void deliverReply(Message msg) {
        String subject = msg.getSubject();
        String token = this.getResponseToken(subject);
        CompletableFuture<Message> f = null;
        f = this.responses.remove(token);
        if (f != null) {
            this.statistics.decrementOutstandingRequests();
            f.complete(msg);
            this.statistics.incrementRepliesReceived();
        }
    }

    @Override
    public Dispatcher createDispatcher(MessageHandler handler) {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (this.isDraining()) {
            throw new IllegalStateException("Connection is Draining");
        }
        NatsDispatcher dispatcher = new NatsDispatcher(this, handler);
        String id = this.nuid.next();
        this.dispatchers.put(id, dispatcher);
        dispatcher.start(id);
        return dispatcher;
    }

    @Override
    public void closeDispatcher(Dispatcher d) {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (!(d instanceof NatsDispatcher)) {
            throw new IllegalArgumentException("Connection can only manage its own dispatchers");
        }
        NatsDispatcher nd = (NatsDispatcher)d;
        if (nd.isDraining()) {
            return;
        }
        if (!this.dispatchers.containsKey(nd.getId())) {
            throw new IllegalArgumentException("Dispatcher is already closed.");
        }
        this.cleanupDispatcher(nd);
    }

    void cleanupDispatcher(NatsDispatcher nd) {
        nd.stop(true);
        this.dispatchers.remove(nd.getId());
    }

    @Override
    public void flush(Duration timeout) throws TimeoutException, InterruptedException {
        Instant start = Instant.now();
        this.waitForConnectOrClose(timeout);
        if (this.isClosed()) {
            throw new TimeoutException("Attempted to flush while closed");
        }
        if (timeout == null) {
            timeout = Duration.ZERO;
        }
        Instant now = Instant.now();
        Duration waitTime = Duration.between(start, now);
        if (!timeout.equals(Duration.ZERO) && waitTime.compareTo(timeout) >= 0) {
            throw new TimeoutException("Timeout out waiting for connection before flush.");
        }
        try {
            CompletableFuture<Boolean> waitForIt = this.sendPing();
            if (waitForIt == null) {
                return;
            }
            long nanos = timeout.toNanos();
            if (nanos > 0L) {
                if ((nanos -= waitTime.toNanos()) <= 0L) {
                    nanos = 1L;
                }
                waitForIt.get(nanos, TimeUnit.NANOSECONDS);
            } else {
                waitForIt.get();
            }
            this.statistics.incrementFlushCounter();
        }
        catch (CancellationException | ExecutionException e) {
            throw new TimeoutException(e.getMessage());
        }
    }

    void sendConnect(String serverURI) {
        NatsServerInfo info = this.serverInfo.get();
        StringBuilder connectString = new StringBuilder();
        connectString.append(OP_CONNECT);
        connectString.append(" ");
        String connectOptions = this.options.buildProtocolConnectOptionsString(serverURI, info.isAuthRequired());
        connectString.append(connectOptions);
        NatsMessage msg = new NatsMessage(connectString.toString());
        this.queueOutgoing(msg);
    }

    CompletableFuture<Boolean> sendPing() {
        int max = this.options.getMaxPingsOut();
        if (!this.isConnectedOrConnecting()) {
            CompletableFuture<Boolean> retVal = new CompletableFuture<Boolean>();
            retVal.complete(Boolean.FALSE);
            return retVal;
        }
        if (max > 0 && this.pongQueue.size() + 1 > max) {
            this.handleCommunicationIssue(new IllegalStateException("Max outgoing Ping count exceeded."));
            return null;
        }
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<Boolean>();
        NatsMessage msg = new NatsMessage(OP_PING);
        this.pongQueue.add(pongFuture);
        this.queueOutgoing(msg);
        this.statistics.incrementPingCount();
        return pongFuture;
    }

    void sendPong() {
        NatsMessage msg = new NatsMessage(OP_PONG);
        this.queueOutgoing(msg);
    }

    void handlePong() {
        CompletableFuture<Boolean> pongFuture = this.pongQueue.pollFirst();
        if (pongFuture != null) {
            pongFuture.complete(Boolean.TRUE);
        }
    }

    void readInitialInfo() throws IOException {
        byte[] readBuffer = new byte[this.options.getBufferSize()];
        ByteBuffer protocolBuffer = ByteBuffer.allocate(this.options.getBufferSize());
        boolean gotCRLF = false;
        boolean gotCR = false;
        int read = 0;
        while (!gotCRLF && (read = this.dataPort.read(readBuffer, 0, readBuffer.length)) >= 0) {
            int i = 0;
            while (i < read) {
                byte b = readBuffer[i++];
                if (gotCR) {
                    if (b != 10) {
                        throw new IOException("Missed LF after CR waiting for INFO.");
                    }
                    if (i < read) {
                        throw new IOException("Read past initial info message.");
                    }
                    gotCRLF = true;
                    break;
                }
                if (b == 13) {
                    gotCR = true;
                    continue;
                }
                if (!protocolBuffer.hasRemaining()) {
                    protocolBuffer = this.enlargeBuffer(protocolBuffer, 0);
                }
                protocolBuffer.put(b);
            }
            if (!gotCRLF) continue;
            break;
        }
        if (!gotCRLF) {
            throw new IOException("Failed to read initial info message.");
        }
        protocolBuffer.flip();
        String infoJson = StandardCharsets.UTF_8.decode(protocolBuffer).toString();
        infoJson = infoJson.trim();
        String[] msg = infoJson.split("\\s");
        String op = msg[0].toUpperCase();
        if (!OP_INFO.equals(op)) {
            throw new IOException("Received non-info initial message.");
        }
        this.handleInfo(infoJson);
    }

    void handleInfo(String infoJson) {
        NatsServerInfo serverInfo = new NatsServerInfo(infoJson);
        this.serverInfo.set(serverInfo);
        String[] urls = this.serverInfo.get().getConnectURLs();
        if (urls != null && urls.length > 0) {
            this.processConnectionEvent(ConnectionListener.Events.DISCOVERED_SERVERS);
        }
    }

    void queueOutgoing(NatsMessage msg) {
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
            throw new IllegalArgumentException("Control line is too long");
        }
        this.writer.queue(msg);
    }

    void deliverMessage(NatsMessage msg) {
        this.statistics.incrementInMsgs();
        this.statistics.incrementInBytes(msg.getSizeInBytes());
        NatsSubscription sub = this.subscribers.get(msg.getSID());
        if (sub != null) {
            MessageQueue q;
            msg.setSubscription(sub);
            NatsDispatcher d = sub.getNatsDispatcher();
            NatsConsumer c = d == null ? sub : d;
            MessageQueue messageQueue = q = d == null ? sub.getMessageQueue() : d.getMessageQueue();
            if (c.hasReachedPendingLimits()) {
                this.statistics.incrementDroppedCount();
                c.incrementDroppedCount();
                if (!c.isMarkedSlow()) {
                    c.markSlow();
                    this.processSlowConsumer(c);
                }
            } else if (q != null) {
                c.markNotSlow();
                q.push(msg);
            }
        }
    }

    void processOK() {
        this.statistics.incrementOkCount();
    }

    void processSlowConsumer(Consumer consumer) {
        ErrorListener handler = this.options.getErrorListener();
        if (handler != null && !this.callbackRunner.isShutdown()) {
            try {
                this.callbackRunner.execute(() -> {
                    try {
                        handler.slowConsumerDetected(this, consumer);
                    }
                    catch (Exception ex) {
                        this.statistics.incrementExceptionCount();
                    }
                });
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    void processException(Exception exp) {
        ErrorListener handler = this.options.getErrorListener();
        this.statistics.incrementExceptionCount();
        if (handler != null && !this.callbackRunner.isShutdown()) {
            try {
                this.callbackRunner.execute(() -> {
                    try {
                        handler.exceptionOccurred(this, exp);
                    }
                    catch (Exception ex) {
                        this.statistics.incrementExceptionCount();
                    }
                });
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    void processError(String errorText) {
        ErrorListener handler = this.options.getErrorListener();
        this.statistics.incrementErrCount();
        this.lastError.set(errorText);
        if (handler != null && !this.callbackRunner.isShutdown()) {
            try {
                this.callbackRunner.execute(() -> {
                    try {
                        handler.errorOccurred(this, errorText);
                    }
                    catch (Exception ex) {
                        this.statistics.incrementExceptionCount();
                    }
                });
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    void processConnectionEvent(ConnectionListener.Events type) {
        ConnectionListener handler = this.options.getConnectionListener();
        if (handler != null && !this.callbackRunner.isShutdown()) {
            try {
                this.callbackRunner.execute(() -> {
                    try {
                        handler.connectionEvent(this, type);
                    }
                    catch (Exception ex) {
                        this.statistics.incrementExceptionCount();
                    }
                });
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    NatsServerInfo getInfo() {
        return this.serverInfo.get();
    }

    @Override
    public Options getOptions() {
        return this.options;
    }

    @Override
    public Statistics getStatistics() {
        return this.statistics;
    }

    NatsStatistics getNatsStatistics() {
        return this.statistics;
    }

    DataPort getDataPort() {
        return this.dataPort;
    }

    int getConsumerCount() {
        return this.subscribers.size() + this.dispatchers.size();
    }

    @Override
    public long getMaxPayload() {
        NatsServerInfo info = this.serverInfo.get();
        if (info == null) {
            return -1L;
        }
        return info.getMaxPayload();
    }

    @Override
    public Collection<String> getServers() {
        NatsServerInfo info = this.serverInfo.get();
        ArrayList<String> servers = new ArrayList<String>();
        this.options.getServers().stream().forEach(x -> servers.add(x.toString()));
        if (info != null && info.getConnectURLs() != null) {
            servers.addAll(Arrays.asList(info.getConnectURLs()));
        }
        return servers;
    }

    @Override
    public String getConnectedUrl() {
        return this.currentServerURI;
    }

    @Override
    public Connection.Status getStatus() {
        return this.status;
    }

    @Override
    public String getLastError() {
        return this.lastError.get();
    }

    void updateStatus(Connection.Status newStatus) {
        Connection.Status oldStatus = this.status;
        this.statusLock.lock();
        try {
            if (oldStatus == Connection.Status.CLOSED) {
                return;
            }
            this.status = newStatus;
        }
        finally {
            this.statusChanged.signalAll();
            this.statusLock.unlock();
        }
        if (this.status == Connection.Status.DISCONNECTED) {
            this.processConnectionEvent(ConnectionListener.Events.DISCONNECTED);
        } else if (this.status == Connection.Status.CLOSED) {
            this.processConnectionEvent(ConnectionListener.Events.CLOSED);
        } else if (oldStatus == Connection.Status.RECONNECTING && this.status == Connection.Status.CONNECTED) {
            this.processConnectionEvent(ConnectionListener.Events.RECONNECTED);
        } else if (this.status == Connection.Status.CONNECTED) {
            this.processConnectionEvent(ConnectionListener.Events.CONNECTED);
        }
    }

    boolean isClosing() {
        return this.closing;
    }

    boolean isClosed() {
        return this.status == Connection.Status.CLOSED;
    }

    boolean isConnected() {
        return this.status == Connection.Status.CONNECTED;
    }

    boolean isConnectedOrConnecting() {
        this.statusLock.lock();
        try {
            boolean bl = this.status == Connection.Status.CONNECTED || this.connecting;
            return bl;
        }
        finally {
            this.statusLock.unlock();
        }
    }

    boolean isDisconnectingOrClosed() {
        this.statusLock.lock();
        try {
            boolean bl = this.status == Connection.Status.CLOSED || this.disconnecting;
            return bl;
        }
        finally {
            this.statusLock.unlock();
        }
    }

    boolean isDisconnecting() {
        this.statusLock.lock();
        try {
            boolean bl = this.disconnecting;
            return bl;
        }
        finally {
            this.statusLock.unlock();
        }
    }

    void waitForDisconnectOrClose(Duration timeout) throws InterruptedException {
        this.waitFor(timeout, Void2 -> this.isDisconnecting() && !this.isClosed());
    }

    void waitForConnectOrClose(Duration timeout) throws InterruptedException {
        this.waitFor(timeout, Void2 -> !this.isConnected() && !this.isClosed());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void waitFor(Duration timeout, Predicate<Void> test) throws InterruptedException {
        this.statusLock.lock();
        try {
            long currentWaitNanos = timeout != null ? timeout.toNanos() : -1L;
            long start = System.nanoTime();
            while (currentWaitNanos >= 0L && test.test(null)) {
                if (currentWaitNanos > 0L) {
                    this.statusChanged.await(currentWaitNanos, TimeUnit.NANOSECONDS);
                    long now = System.nanoTime();
                    if ((currentWaitNanos -= now - (start = now)) > 0L) continue;
                    break;
                }
                this.statusChanged.await();
            }
        }
        finally {
            this.statusLock.unlock();
        }
    }

    void waitForReconnectTimeout() {
        long now;
        long start = System.nanoTime();
        for (long currentWaitNanos = (waitTime = this.options.getReconnectWait()) != null ? waitTime.toNanos() : -1L; !(currentWaitNanos <= 0L || this.isDisconnectingOrClosed() || this.isConnected() || this.reconnectWaiter.isDone()); currentWaitNanos -= now - start) {
            try {
                this.reconnectWaiter.get(currentWaitNanos, TimeUnit.NANOSECONDS);
            }
            catch (Exception exception) {
                // empty catch block
            }
            now = System.nanoTime();
            start = now;
        }
        this.reconnectWaiter.complete(Boolean.TRUE);
    }

    Collection<String> buildReconnectList() {
        ArrayList<String> reconnectList = new ArrayList<String>();
        reconnectList.addAll(this.getServers());
        if (this.options.isNoRandomize()) {
            return reconnectList;
        }
        Collections.shuffle(reconnectList);
        return reconnectList;
    }

    ByteBuffer enlargeBuffer(ByteBuffer buffer, int atLeast) {
        int current = buffer.capacity();
        int newSize = Math.max(current * 2, atLeast);
        ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
        buffer.flip();
        newBuffer.put(buffer);
        return newBuffer;
    }

    NatsConnectionReader getReader() {
        return this.reader;
    }

    boolean isDraining() {
        return this.draining.get() != null;
    }

    boolean isDrained() {
        CompletableFuture<Boolean> tracker = this.draining.get();
        try {
            if (tracker != null && tracker.getNow(false).booleanValue()) {
                return true;
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        return false;
    }

    @Override
    public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutException, InterruptedException {
        if (this.isClosing() || this.isClosed()) {
            throw new IllegalStateException("A connection can't be drained during close.");
        }
        this.statusLock.lock();
        try {
            if (this.isDraining()) {
                CompletableFuture<Boolean> completableFuture = this.draining.get();
                return completableFuture;
            }
            this.draining.set(new CompletableFuture());
        }
        finally {
            this.statusLock.unlock();
        }
        CompletableFuture<Boolean> tracker = this.draining.get();
        Instant start = Instant.now();
        HashSet<NatsSubscription> pureSubscribers = new HashSet<NatsSubscription>();
        pureSubscribers.addAll(this.subscribers.values());
        pureSubscribers.removeIf(s -> s.getDispatcher() != null);
        HashSet<NatsConsumer> consumers = new HashSet<NatsConsumer>();
        consumers.addAll(pureSubscribers);
        consumers.addAll(this.dispatchers.values());
        NatsDispatcher inboxer = this.inboxDispatcher.get();
        if (inboxer != null) {
            consumers.add(inboxer);
        }
        consumers.forEach(cons -> {
            cons.markDraining(tracker);
            cons.sendUnsubForDrain();
        });
        this.flush(timeout);
        consumers.forEach(cons -> cons.markUnsubedForDrain());
        Thread t = new Thread(() -> {
            try {
                Instant now = Instant.now();
                while (timeout == null || timeout.equals(Duration.ZERO) || Duration.between(start, now).compareTo(timeout) < 0) {
                    Iterator i = consumers.iterator();
                    while (i.hasNext()) {
                        NatsConsumer cons = (NatsConsumer)i.next();
                        if (!cons.isDrained()) continue;
                        i.remove();
                    }
                    if (consumers.size() == 0) break;
                    Thread.sleep(1L);
                    now = Instant.now();
                }
                this.blockPublishForDrain.set(true);
                if (timeout == null || timeout.equals(Duration.ZERO)) {
                    this.flush(Duration.ZERO);
                } else {
                    now = Instant.now();
                    Duration passed = Duration.between(start, now);
                    Duration newTimeout = timeout.minus(passed);
                    if (newTimeout.toNanos() > 0L) {
                        this.flush(newTimeout);
                    }
                }
                this.close(false);
                tracker.complete(consumers.size() == 0);
            }
            catch (InterruptedException | TimeoutException e) {
                this.processException(e);
            }
            finally {
                try {
                    this.close();
                }
                catch (InterruptedException e) {
                    this.processException(e);
                }
                tracker.complete(false);
            }
        });
        t.start();
        return tracker;
    }
}

