/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.AuthenticationException;
import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.Consumer;
import io.nats.client.ConsumerContext;
import io.nats.client.Dispatcher;
import io.nats.client.ErrorListener;
import io.nats.client.ForceReconnectOptions;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamOptions;
import io.nats.client.JetStreamStatusException;
import io.nats.client.KeyValue;
import io.nats.client.KeyValueManagement;
import io.nats.client.KeyValueOptions;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.NUID;
import io.nats.client.NatsSystemClock;
import io.nats.client.ObjectStore;
import io.nats.client.ObjectStoreManagement;
import io.nats.client.ObjectStoreOptions;
import io.nats.client.Options;
import io.nats.client.ReconnectDelayHandler;
import io.nats.client.ServerPool;
import io.nats.client.Statistics;
import io.nats.client.StatisticsCollector;
import io.nats.client.StreamContext;
import io.nats.client.Subscription;
import io.nats.client.TimeTraceLogger;
import io.nats.client.api.ServerInfo;
import io.nats.client.impl.DataPort;
import io.nats.client.impl.DispatcherFactory;
import io.nats.client.impl.Headers;
import io.nats.client.impl.MessageQueue;
import io.nats.client.impl.NatsConnectionReader;
import io.nats.client.impl.NatsConnectionWriter;
import io.nats.client.impl.NatsConsumer;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsJetStream;
import io.nats.client.impl.NatsJetStreamManagement;
import io.nats.client.impl.NatsKeyValue;
import io.nats.client.impl.NatsKeyValueManagement;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.NatsObjectStore;
import io.nats.client.impl.NatsObjectStoreManagement;
import io.nats.client.impl.NatsPublishableMessage;
import io.nats.client.impl.NatsServerPool;
import io.nats.client.impl.NatsStatistics;
import io.nats.client.impl.NatsStreamContext;
import io.nats.client.impl.NatsSubscription;
import io.nats.client.impl.NatsSubscriptionFactory;
import io.nats.client.impl.ProtocolMessage;
import io.nats.client.support.ByteArrayBuilder;
import io.nats.client.support.NatsConstants;
import io.nats.client.support.NatsInetAddress;
import io.nats.client.support.NatsRequestCompletableFuture;
import io.nats.client.support.NatsUri;
import io.nats.client.support.ScheduledTask;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

class NatsConnection
implements Connection {
    public static final double NANOS_PER_SECOND = 1.0E9;
    private final Options options;
    final boolean forceFlushOnRequest;
    private final StatisticsCollector statistics;
    private boolean connecting;
    private boolean disconnecting;
    private boolean closing;
    private Exception exceptionDuringConnectChange;
    final ReentrantLock closeSocketLock;
    private Connection.Status status;
    private final ReentrantLock statusLock;
    private final Condition statusChanged;
    private CompletableFuture<DataPort> dataPortFuture;
    private DataPort dataPort;
    private NatsUri currentServer;
    private CompletableFuture<Boolean> reconnectWaiter;
    private final ConcurrentHashMap<NatsUri, String> serverAuthErrors;
    private NatsConnectionReader reader;
    private NatsConnectionWriter writer;
    private final AtomicReference<ServerInfo> serverInfo;
    private final Map<String, NatsSubscription> subscribers;
    private final Map<String, NatsDispatcher> dispatchers;
    private final Collection<ConnectionListener> connectionListeners;
    private final Map<String, NatsRequestCompletableFuture> responsesAwaiting;
    private final Map<String, NatsRequestCompletableFuture> responsesRespondedTo;
    private final ConcurrentLinkedDeque<CompletableFuture<Boolean>> pongQueue;
    private final String mainInbox;
    private final AtomicReference<NatsDispatcher> inboxDispatcher;
    private final ReentrantLock inboxDispatcherLock;
    private ScheduledTask pingTask;
    private ScheduledTask cleanupTask;
    private final AtomicBoolean needPing;
    private final AtomicLong nextSid;
    private final NUID nuid;
    private final AtomicReference<String> connectError;
    private final AtomicReference<String> lastError;
    private final AtomicReference<CompletableFuture<Boolean>> draining;
    private final AtomicBoolean blockPublishForDrain;
    private final AtomicBoolean tryingToConnect;
    private final ExecutorService callbackRunner;
    private final ExecutorService executor;
    private final ExecutorService connectExecutor;
    private final ScheduledExecutorService scheduledExecutor;
    private final boolean advancedTracking;
    private final ServerPool serverPool;
    private final DispatcherFactory dispatcherFactory;
    private final @NonNull NatsRequestCompletableFuture.CancelAction cancelAction;
    private final boolean trace;
    private final TimeTraceLogger timeTraceLogger;
    private static final ProtocolMessage PING_PROTO = new ProtocolMessage(NatsConstants.OP_PING_BYTES);
    private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(NatsConstants.OP_PONG_BYTES);

    NatsConnection(@NonNull Options options) {
        this.trace = options.isTraceConnection();
        this.timeTraceLogger = options.getTimeTraceLogger();
        this.timeTraceLogger.trace("creating connection object", new Object[0]);
        this.options = options;
        this.forceFlushOnRequest = options.forceFlushOnRequest();
        this.advancedTracking = options.isTrackAdvancedStats();
        this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
        this.statistics.setAdvancedTracking(this.advancedTracking);
        this.closeSocketLock = new ReentrantLock();
        this.statusLock = new ReentrantLock();
        this.statusChanged = this.statusLock.newCondition();
        this.status = Connection.Status.DISCONNECTED;
        this.reconnectWaiter = new CompletableFuture();
        this.reconnectWaiter.complete(Boolean.TRUE);
        this.connectionListeners = ConcurrentHashMap.newKeySet();
        if (options.getConnectionListener() != null) {
            this.addConnectionListener(options.getConnectionListener());
        }
        this.dispatchers = new ConcurrentHashMap<String, NatsDispatcher>();
        this.subscribers = new ConcurrentHashMap<String, NatsSubscription>();
        this.responsesAwaiting = new ConcurrentHashMap<String, NatsRequestCompletableFuture>();
        this.responsesRespondedTo = new ConcurrentHashMap<String, NatsRequestCompletableFuture>();
        this.serverAuthErrors = new ConcurrentHashMap();
        this.nextSid = new AtomicLong(1L);
        this.timeTraceLogger.trace("creating NUID", new Object[0]);
        this.nuid = new NUID();
        this.mainInbox = this.createInbox() + ".*";
        this.lastError = new AtomicReference();
        this.connectError = new AtomicReference();
        this.serverInfo = new AtomicReference<ServerInfo>(ServerInfo.EMPTY_INFO);
        this.inboxDispatcher = new AtomicReference();
        this.inboxDispatcherLock = new ReentrantLock();
        this.pongQueue = new ConcurrentLinkedDeque();
        this.draining = new AtomicReference();
        this.blockPublishForDrain = new AtomicBoolean();
        this.tryingToConnect = new AtomicBoolean();
        this.timeTraceLogger.trace("creating executors", new Object[0]);
        this.executor = options.getExecutor();
        this.callbackRunner = options.getCallbackExecutor();
        this.connectExecutor = options.getConnectExecutor();
        this.scheduledExecutor = options.getScheduledExecutor();
        this.timeTraceLogger.trace("creating reader and writer", new Object[0]);
        this.reader = new NatsConnectionReader(this);
        this.writer = new NatsConnectionWriter(this, null);
        this.needPing = new AtomicBoolean(true);
        this.serverPool = options.getServerPool() == null ? new NatsServerPool() : options.getServerPool();
        this.serverPool.initialize(options);
        this.dispatcherFactory = options.getDispatcherFactory() == null ? new DispatcherFactory() : options.getDispatcherFactory();
        this.cancelAction = options.isReportNoResponders() ? NatsRequestCompletableFuture.CancelAction.REPORT : NatsRequestCompletableFuture.CancelAction.CANCEL;
        this.timeTraceLogger.trace("connection object created", new Object[0]);
    }

    void connect(boolean reconnectOnConnect) throws InterruptedException, IOException {
        if (!this.tryingToConnect.get()) {
            try {
                this.tryingToConnect.set(true);
                this.connectImpl(reconnectOnConnect);
            }
            finally {
                this.tryingToConnect.set(false);
            }
        }
    }

    /*
     * Enabled aggressive block sorting
     */
    void connectImpl(boolean reconnectOnConnect) throws InterruptedException, IOException {
        NatsUri cur;
        if (this.options.getServers().isEmpty()) {
            throw new IllegalArgumentException("No servers provided in options");
        }
        boolean trace = this.options.isTraceConnection();
        long start = NatsSystemClock.nanoTime();
        this.lastError.set("");
        this.timeTraceLogger.trace("starting connect loop", new Object[0]);
        HashSet<NatsUri> failList = new HashSet<NatsUri>();
        boolean keepGoing = true;
        NatsUri first = null;
        block0: while (keepGoing && (cur = this.serverPool.peekNextServer()) != null) {
            if (first == null) {
                first = cur;
            } else if (cur.equals(first)) break;
            this.serverPool.nextServer();
            List<NatsUri> resolvedList = this.resolveHost(cur);
            for (NatsUri resolved : resolvedList) {
                if (this.isClosed()) {
                    keepGoing = false;
                    continue block0;
                }
                this.connectError.set("");
                this.timeTraceLogger.trace("setting status to connecting", new Object[0]);
                this.updateStatus(Connection.Status.CONNECTING);
                this.timeTraceLogger.trace("trying to connect to %s", cur);
                this.tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
                if (this.isConnected()) {
                    this.serverPool.connectSucceeded(cur);
                    keepGoing = false;
                    continue block0;
                }
                this.timeTraceLogger.trace("setting status to disconnected", new Object[0]);
                this.updateStatus(Connection.Status.DISCONNECTED);
                failList.add(cur);
                this.serverPool.connectFailed(cur);
                String err = this.connectError.get();
                if (!this.isAuthenticationError(err)) continue;
                this.serverAuthErrors.put(resolved, err);
            }
        }
        if (!this.isConnected() && !this.isClosed()) {
            if (reconnectOnConnect) {
                this.timeTraceLogger.trace("trying to reconnect on connect", new Object[0]);
                this.reconnectImpl();
                return;
            }
            this.timeTraceLogger.trace("connection failed, closing to cleanup", new Object[0]);
            this.close();
            String err = this.connectError.get();
            if (!this.isAuthenticationError(err)) throw new IOException("Unable to connect to NATS servers: " + failList);
            throw new AuthenticationException("Authentication error connecting to NATS server: " + err);
        }
        if (!trace) return;
        long end = NatsSystemClock.nanoTime();
        double seconds = (double)(end - start) / 1.0E9;
        this.timeTraceLogger.trace("connect complete in %.3f seconds", seconds);
    }

    @Override
    public void forceReconnect() throws IOException, InterruptedException {
        this.forceReconnect(ForceReconnectOptions.DEFAULT_INSTANCE);
    }

    @Override
    public void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException {
        if (!this.tryingToConnect.get()) {
            try {
                this.tryingToConnect.set(true);
                this.forceReconnectImpl(options == null ? ForceReconnectOptions.DEFAULT_INSTANCE : options);
            }
            finally {
                this.tryingToConnect.set(false);
            }
        }
    }

    void forceReconnectImpl(@NonNull ForceReconnectOptions frOpts) throws InterruptedException {
        if (frOpts.getFlushWait() != null) {
            try {
                this.flush(frOpts.getFlushWait());
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        }
        this.closeSocketLock.lock();
        try {
            this.updateStatus(Connection.Status.DISCONNECTED);
            if (this.dataPortFuture != null) {
                this.dataPortFuture.cancel(true);
                this.dataPortFuture = null;
            }
            if (this.dataPort != null) {
                DataPort dataPortToClose = this.dataPort;
                this.dataPort = null;
                this.executor.submit(() -> {
                    try {
                        if (frOpts.isForceClose()) {
                            dataPortToClose.forceClose();
                        } else {
                            dataPortToClose.close();
                        }
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                });
            }
            try {
                this.reader.stop(false).get(100L, TimeUnit.MILLISECONDS);
            }
            catch (Exception ex) {
                this.processException(ex);
            }
            try {
                this.writer.stop().get(100L, TimeUnit.MILLISECONDS);
            }
            catch (Exception ex) {
                this.processException(ex);
            }
            this.reader = new NatsConnectionReader(this);
            this.writer = new NatsConnectionWriter(this, this.writer);
        }
        finally {
            this.closeSocketLock.unlock();
        }
        this.reconnectImpl();
        this.writer.setReconnectMode(false);
    }

    void reconnect() throws InterruptedException {
        if (!this.tryingToConnect.get()) {
            try {
                this.tryingToConnect.set(true);
                this.reconnectImpl();
            }
            finally {
                this.tryingToConnect.set(false);
            }
        }
    }

    void reconnectImpl() throws InterruptedException {
        if (this.isClosed()) {
            return;
        }
        if (this.options.getMaxReconnect() == 0) {
            this.close();
            return;
        }
        this.writer.setReconnectMode(true);
        if (!(this.isConnected() || this.isClosed() || this.isClosing())) {
            NatsUri cur;
            boolean keepGoing = true;
            int totalRounds = 0;
            NatsUri first = null;
            block2: while (keepGoing && (cur = this.serverPool.nextServer()) != null) {
                if (first == null) {
                    first = cur;
                } else if (first.equals(cur)) {
                    this.invokeReconnectDelayHandler(++totalRounds);
                }
                List<NatsUri> resolvedList = this.resolveHost(cur);
                for (NatsUri resolved : resolvedList) {
                    if (this.isClosed()) {
                        keepGoing = false;
                        continue block2;
                    }
                    this.connectError.set("");
                    if (this.isDisconnectingOrClosed() || this.isClosing()) {
                        keepGoing = false;
                        continue block2;
                    }
                    this.updateStatus(Connection.Status.RECONNECTING);
                    this.timeTraceLogger.trace("reconnecting to server %s", cur);
                    this.tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
                    if (this.isConnected()) {
                        this.serverPool.connectSucceeded(cur);
                        this.statistics.incrementReconnects();
                        keepGoing = false;
                        continue block2;
                    }
                    this.serverPool.connectFailed(cur);
                    String err = this.connectError.get();
                    if (!this.isAuthenticationError(err)) continue;
                    if (err.equals(this.serverAuthErrors.get(resolved))) {
                        keepGoing = false;
                        continue block2;
                    }
                    this.serverAuthErrors.put(resolved, err);
                }
            }
        }
        if (!this.isConnected()) {
            this.close();
            return;
        }
        this.subscribers.forEach((sid, sub) -> {
            if (sub.getDispatcher() == null && !sub.isDraining()) {
                this.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
            }
        });
        this.dispatchers.forEach((nuid, d) -> {
            if (!d.isDraining()) {
                d.resendSubscriptions();
            }
        });
        try {
            this.flush(this.options.getConnectionTimeout());
        }
        catch (Exception exp) {
            this.processException(exp);
        }
        this.processConnectionEvent(ConnectionListener.Events.RESUBSCRIBED);
        this.writer.setReconnectMode(false);
    }

    long timeCheck(long endNanos, String message) throws TimeoutException {
        long remainingNanos = endNanos - NatsSystemClock.nanoTime();
        if (this.trace) {
            this.traceTimeCheck(message, remainingNanos);
        }
        if (remainingNanos < 0L) {
            throw new TimeoutException("connection timed out");
        }
        return remainingNanos;
    }

    void traceTimeCheck(String message, long remainingNanos) {
        if (remainingNanos < 0L) {
            if (remainingNanos > -1000000L) {
                this.timeTraceLogger.trace(message + String.format(", %d (ns) beyond timeout", -remainingNanos), new Object[0]);
            } else if (remainingNanos > -1000000000L) {
                long ms = -remainingNanos / 1000000L;
                this.timeTraceLogger.trace(message + String.format(", %d (ms) beyond timeout", ms), new Object[0]);
            } else {
                double seconds = (double)(-remainingNanos) / 1.0E9;
                this.timeTraceLogger.trace(message + String.format(", %.3f (s) beyond timeout", seconds), new Object[0]);
            }
        } else if (remainingNanos < 1000000L) {
            this.timeTraceLogger.trace(message + String.format(", %d (ns) remaining", remainingNanos), new Object[0]);
        } else if (remainingNanos < 1000000000L) {
            long ms = remainingNanos / 1000000L;
            this.timeTraceLogger.trace(message + String.format(", %d (ms) remaining", ms), new Object[0]);
        } else {
            double seconds = (double)remainingNanos / 1.0E9;
            this.timeTraceLogger.trace(message + String.format(", %.3f (s) remaining", seconds), new Object[0]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void tryToConnect(NatsUri cur, NatsUri resolved, long now) {
        this.currentServer = null;
        try {
            Duration connectTimeout = this.options.getConnectionTimeout();
            boolean trace = this.options.isTraceConnection();
            long end = now + connectTimeout.toNanos();
            this.timeCheck(end, "starting connection attempt");
            this.statusLock.lock();
            try {
                if (this.connecting) {
                    return;
                }
                this.connecting = true;
                this.statusChanged.signalAll();
            }
            finally {
                this.statusLock.unlock();
            }
            this.dataPortFuture = new CompletableFuture();
            long timeoutNanos = this.timeCheck(end, "waiting for reader");
            if (this.reader.isRunning()) {
                this.reader.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
            }
            timeoutNanos = this.timeCheck(end, "waiting for writer");
            if (this.writer.isRunning()) {
                this.writer.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
            }
            this.timeCheck(end, "cleaning pong queue");
            this.cleanUpPongQueue();
            timeoutNanos = this.timeCheck(end, "connecting data port");
            DataPort newDataPort = this.options.buildDataPort();
            newDataPort.connect(resolved.toString(), this, timeoutNanos);
            this.dataPort = newDataPort;
            this.dataPortFuture.complete(this.dataPort);
            Callable<Object> connectTask = () -> {
                if (!this.options.isTlsFirst()) {
                    this.readInitialInfo();
                    this.checkVersionRequirements();
                }
                long start = NatsSystemClock.nanoTime();
                this.upgradeToSecureIfNeeded(resolved);
                if (trace && this.options.isTLSRequired()) {
                    this.timeTraceLogger.trace("TLS upgrade took: %.3f (s)", (double)(NatsSystemClock.nanoTime() - start) / 1.0E9);
                }
                if (this.options.isTlsFirst()) {
                    this.readInitialInfo();
                    this.checkVersionRequirements();
                }
                return null;
            };
            timeoutNanos = this.timeCheck(end, "reading info, version and upgrading to secure if necessary");
            Future<Object> future = this.connectExecutor.submit(connectTask);
            try {
                future.get(timeoutNanos, TimeUnit.NANOSECONDS);
            }
            finally {
                future.cancel(true);
            }
            this.timeCheck(end, "starting reader");
            this.reader.start(this.dataPortFuture);
            this.timeCheck(end, "starting writer");
            this.writer.start(this.dataPortFuture);
            this.timeCheck(end, "sending connect message");
            this.sendConnect(resolved);
            timeoutNanos = this.timeCheck(end, "sending initial ping");
            CompletableFuture<Boolean> pongFuture = this.sendPing();
            if (pongFuture != null) {
                pongFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
            }
            if (this.pingTask == null) {
                long cleanMillis;
                this.timeCheck(end, "starting ping and cleanup timers");
                long pingMillis = this.options.getPingInterval().toMillis();
                if (pingMillis > 0L) {
                    this.pingTask = new ScheduledTask(this.scheduledExecutor, pingMillis, () -> {
                        if (this.isConnected() && !this.isClosing()) {
                            try {
                                this.softPing();
                            }
                            catch (Exception exception) {
                                // empty catch block
                            }
                        }
                    });
                }
                if ((cleanMillis = this.options.getRequestCleanupInterval().toMillis()) > 0L) {
                    this.cleanupTask = new ScheduledTask(this.scheduledExecutor, cleanMillis, () -> this.cleanResponses(false));
                }
            }
            this.timeCheck(end, "updating status to connected");
            this.statusLock.lock();
            try {
                this.connecting = false;
                if (this.exceptionDuringConnectChange != null) {
                    throw this.exceptionDuringConnectChange;
                }
                this.currentServer = cur;
                this.serverAuthErrors.clear();
                this.updateStatus(Connection.Status.CONNECTED);
            }
            finally {
                this.statusLock.unlock();
            }
            this.timeTraceLogger.trace("status updated", new Object[0]);
        }
        catch (Exception exp) {
            this.processException(exp);
            try {
                this.closeSocket(false, true);
            }
            catch (InterruptedException e) {
                this.processException(e);
                Thread.currentThread().interrupt();
            }
        }
        finally {
            this.statusLock.lock();
            try {
                this.connecting = false;
                this.statusChanged.signalAll();
            }
            finally {
                this.statusLock.unlock();
            }
        }
    }

    void checkVersionRequirements() throws IOException {
        Options opts = this.getOptions();
        ServerInfo info = this.getServerInfo();
        if (opts.isNoEcho() && info.getProtocolVersion() < 1) {
            throw new IOException("Server does not support no echo.");
        }
    }

    void upgradeToSecureIfNeeded(NatsUri nuri) throws IOException {
        if (!nuri.isWebsocket()) {
            if (this.options.isTlsFirst()) {
                this.dataPort.upgradeToSecure();
            } else {
                ServerInfo serverInfo = this.getServerInfo();
                if (this.options.isTLSRequired()) {
                    if (!serverInfo.isTLSRequired() && !serverInfo.isTLSAvailable()) {
                        throw new IOException("SSL connection wanted by client.");
                    }
                    this.dataPort.upgradeToSecure();
                } else if (serverInfo.isTLSRequired()) {
                    throw new IOException("SSL required by server.");
                }
            }
        }
    }

    void handleCommunicationIssue(Exception io) {
        this.statusLock.lock();
        try {
            if (this.connecting || this.disconnecting || this.status == Connection.Status.CLOSED || this.isDraining()) {
                this.exceptionDuringConnectChange = io;
                return;
            }
        }
        finally {
            this.statusLock.unlock();
        }
        this.processException(io);
        this.executor.submit(() -> {
            if (!this.tryingToConnect.get()) {
                try {
                    this.tryingToConnect.set(true);
                    this.closeSocket(true, true);
                }
                catch (InterruptedException e) {
                    this.processException(e);
                    Thread.currentThread().interrupt();
                }
                finally {
                    this.tryingToConnect.set(false);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException {
        this.closeSocketLock.lock();
        try {
            boolean wasConnected;
            this.statusLock.lock();
            try {
                if (this.isDisconnectingOrClosed()) {
                    this.waitForDisconnectOrClose(this.options.getConnectionTimeout());
                    return;
                }
                this.disconnecting = true;
                this.exceptionDuringConnectChange = null;
                wasConnected = this.status == Connection.Status.CONNECTED;
                this.statusChanged.signalAll();
            }
            finally {
                this.statusLock.unlock();
            }
            this.closeSocketImpl(forceClose);
            this.statusLock.lock();
            try {
                this.updateStatus(Connection.Status.DISCONNECTED);
                this.exceptionDuringConnectChange = null;
                this.disconnecting = false;
                this.statusChanged.signalAll();
            }
            finally {
                this.statusLock.unlock();
            }
            if (this.isClosing()) {
                this.close();
            } else if (wasConnected && tryReconnectIfConnected) {
                this.reconnectImpl();
            }
        }
        finally {
            this.closeSocketLock.unlock();
        }
    }

    @Override
    public void close() throws InterruptedException {
        this.close(true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException {
        this.statusLock.lock();
        try {
            if (checkDrainStatus && this.isDraining()) {
                this.waitForDisconnectOrClose(this.options.getConnectionTimeout());
                return;
            }
            this.closing = true;
            if (this.isDisconnectingOrClosed()) {
                this.waitForDisconnectOrClose(this.options.getConnectionTimeout());
                return;
            }
            this.disconnecting = true;
            this.exceptionDuringConnectChange = null;
            this.statusChanged.signalAll();
        }
        finally {
            this.statusLock.unlock();
        }
        if (this.reconnectWaiter != null) {
            this.reconnectWaiter.cancel(true);
        }
        this.closeSocketImpl(forceClose);
        this.dispatchers.forEach((nuid, d) -> d.stop(false));
        this.subscribers.forEach((sid, sub) -> sub.invalidate());
        this.dispatchers.clear();
        this.subscribers.clear();
        if (this.pingTask != null) {
            this.pingTask.shutdown();
            this.pingTask = null;
        }
        if (this.cleanupTask != null) {
            this.cleanupTask.shutdown();
            this.cleanupTask = null;
        }
        this.cleanResponses(true);
        this.cleanUpPongQueue();
        this.statusLock.lock();
        try {
            this.updateStatus(Connection.Status.CLOSED);
        }
        finally {
            this.statusLock.unlock();
        }
        this.callbackRunner.shutdown();
        try {
            this.callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
        }
        finally {
            this.callbackRunner.shutdownNow();
        }
        this.connectExecutor.shutdownNow();
        if (this.options.executorIsInternal()) {
            this.executor.shutdownNow();
        }
        if (this.options.scheduledExecutorIsInternal()) {
            this.scheduledExecutor.shutdownNow();
        }
        this.statusLock.lock();
        try {
            this.disconnecting = false;
            this.statusChanged.signalAll();
        }
        finally {
            this.statusLock.unlock();
        }
    }

    boolean callbackRunnerIsShutdown() {
        return this.callbackRunner == null || this.callbackRunner.isShutdown();
    }

    boolean executorIsShutdown() {
        return this.executor == null || this.executor.isShutdown();
    }

    boolean connectExecutorIsShutdown() {
        return this.connectExecutor == null || this.connectExecutor.isShutdown();
    }

    boolean scheduledExecutorIsShutdown() {
        return this.scheduledExecutor == null || this.scheduledExecutor.isShutdown();
    }

    void closeSocketImpl(boolean forceClose) {
        this.currentServer = null;
        Future<Boolean> readStop = this.reader.stop();
        Future<Boolean> writeStop = this.writer.stop();
        try {
            readStop.get(1L, TimeUnit.SECONDS);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            writeStop.get(1L, TimeUnit.SECONDS);
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (this.dataPortFuture != null) {
            this.dataPortFuture.cancel(true);
            this.dataPortFuture = null;
        }
        try {
            if (this.dataPort != null) {
                if (forceClose) {
                    this.dataPort.forceClose();
                } else {
                    this.dataPort.close();
                }
            }
        }
        catch (IOException ex) {
            this.processException(ex);
        }
        this.cleanUpPongQueue();
        try {
            this.reader.stop().get(10L, TimeUnit.SECONDS);
        }
        catch (Exception ex) {
            this.processException(ex);
        }
        try {
            this.writer.stop().get(10L, TimeUnit.SECONDS);
        }
        catch (Exception ex) {
            this.processException(ex);
        }
    }

    void cleanUpPongQueue() {
        Future b;
        while ((b = (Future)this.pongQueue.poll()) != null) {
            b.cancel(true);
        }
    }

    @Override
    public void publish(@NonNull String subject, byte @Nullable [] body) {
        this.publishInternal(subject, null, null, body, true, false);
    }

    @Override
    public void publish(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body) {
        this.publishInternal(subject, null, headers, body, true, false);
    }

    @Override
    public void publish(@NonNull String subject, @Nullable String replyTo, byte @Nullable [] body) {
        this.publishInternal(subject, replyTo, null, body, true, false);
    }

    @Override
    public void publish(@NonNull String subject, @Nullable String replyTo, @Nullable Headers headers, byte @Nullable [] body) {
        this.publishInternal(subject, replyTo, headers, body, true, false);
    }

    @Override
    public void publish(@NonNull Message message) {
        Validator.validateNotNull(message, "Message");
        this.publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false, false);
    }

    void publishInternal(@NonNull String subject, @Nullable String replyTo, @Nullable Headers headers, byte @Nullable [] data, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
        this.checkPayloadSize(data);
        NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
        if (npm.hasHeaders && !this.serverInfo.get().isHeadersSupported()) {
            throw new IllegalArgumentException("Headers are not supported by the server, version: " + this.serverInfo.get().getVersion());
        }
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (this.blockPublishForDrain.get()) {
            throw new IllegalStateException("Connection is Draining");
        }
        if (!(this.status != Connection.Status.RECONNECTING && this.status != Connection.Status.DISCONNECTED || this.writer.canQueueDuringReconnect(npm))) {
            throw new IllegalStateException("Unable to queue any more messages during reconnect, max buffer is " + this.options.getReconnectBufferSize());
        }
        this.queueOutgoing(npm);
    }

    private void checkPayloadSize(byte @Nullable [] body) {
        if (body != null && this.options.clientSideLimitChecks() && (long)body.length > this.getMaxPayload() && this.getMaxPayload() > 0L) {
            throw new IllegalArgumentException("Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload());
        }
    }

    @Override
    public @NonNull Subscription subscribe(@NonNull String subject) {
        Validator.validateSubject(subject, true);
        return this.createSubscription(subject, null, null, null);
    }

    @Override
    public @NonNull Subscription subscribe(@NonNull String subject, @NonNull String queueName) {
        Validator.validateSubject(subject, true);
        Validator.validateQueueName(queueName, true);
        return this.createSubscription(subject, queueName, null, null);
    }

    void invalidate(NatsSubscription sub) {
        this.remove(sub);
        sub.invalidate();
    }

    void remove(NatsSubscription sub) {
        String sid = sub.getSID();
        this.subscribers.remove(sid);
        if (sub.getNatsDispatcher() != null) {
            sub.getNatsDispatcher().remove(sub);
        }
    }

    void unsubscribe(NatsSubscription sub, int after) {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (after <= 0) {
            this.invalidate(sub);
        } else {
            sub.setUnsubLimit(after);
            if (sub.reachedUnsubLimit()) {
                sub.invalidate();
            }
        }
        if (!this.isConnected()) {
            return;
        }
        this.sendUnsub(sub, after);
    }

    void sendUnsub(@NonNull NatsSubscription sub, int after) {
        ByteArrayBuilder bab = new ByteArrayBuilder().append(NatsConstants.UNSUB_SP_BYTES).append(sub.getSID());
        if (after > 0) {
            bab.append((byte)32).append(after);
        }
        this.queueOutgoing(new ProtocolMessage(bab, true));
    }

    @NonNull NatsSubscription createSubscription(@NonNull String subject, @Nullable String queueName, @Nullable NatsDispatcher dispatcher, @Nullable NatsSubscriptionFactory factory) {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (this.isDraining() && (dispatcher == null || dispatcher != this.inboxDispatcher.get())) {
            throw new IllegalStateException("Connection is Draining");
        }
        String sid = this.getNextSid();
        NatsSubscription sub = factory == null ? new NatsSubscription(sid, subject, queueName, this, dispatcher) : factory.createNatsSubscription(sid, subject, queueName, this, dispatcher);
        this.subscribers.put(sid, sub);
        this.sendSubscriptionMessage(sid, subject, queueName, false);
        return sub;
    }

    String getNextSid() {
        return Long.toString(this.nextSid.getAndIncrement());
    }

    String reSubscribe(NatsSubscription sub, String subject, String queueName) {
        String sid = this.getNextSid();
        this.sendSubscriptionMessage(sid, subject, queueName, false);
        this.subscribers.put(sid, sub);
        return sid;
    }

    void sendSubscriptionMessage(String sid, String subject, String queueName, boolean treatAsInternal) {
        if (!this.isConnected()) {
            return;
        }
        ByteArrayBuilder bab = new ByteArrayBuilder(StandardCharsets.UTF_8).append(NatsConstants.SUB_SP_BYTES).append(subject);
        if (queueName != null) {
            bab.append((byte)32).append(queueName);
        }
        bab.append((byte)32).append(sid);
        ProtocolMessage subMsg = new ProtocolMessage(bab, true);
        if (treatAsInternal) {
            this.queueInternalOutgoing(subMsg);
        } else {
            this.queueOutgoing(subMsg);
        }
    }

    @Override
    public @NonNull String createInbox() {
        return this.options.getInboxPrefix() + this.nuid.next();
    }

    int getRespInboxLength() {
        return this.options.getInboxPrefix().length() + 22 + 1;
    }

    String createResponseInbox(String inbox) {
        return inbox.substring(0, this.getRespInboxLength()) + this.nuid.next();
    }

    String getResponseToken(String responseInbox) {
        int len = this.getRespInboxLength();
        if (responseInbox.length() <= len) {
            return responseInbox;
        }
        return responseInbox.substring(len);
    }

    void cleanResponses(boolean closing) {
        ArrayList<String> toRemove = new ArrayList<String>();
        boolean wasInterrupted = false;
        for (Map.Entry<String, NatsRequestCompletableFuture> entry : this.responsesAwaiting.entrySet()) {
            boolean remove = false;
            NatsRequestCompletableFuture future = entry.getValue();
            if (future.hasExceededTimeout()) {
                remove = true;
                future.cancelTimedOut();
            } else if (closing) {
                remove = true;
                future.cancelClosing();
            } else if (future.isDone()) {
                remove = true;
                try {
                    future.get();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    wasInterrupted = true;
                    break;
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
            if (!remove) continue;
            toRemove.add(entry.getKey());
            this.statistics.decrementOutstandingRequests();
        }
        for (String key : toRemove) {
            this.responsesAwaiting.remove(key);
        }
        if (this.advancedTracking && !wasInterrupted) {
            toRemove.clear();
            for (Map.Entry<String, NatsRequestCompletableFuture> entry : this.responsesRespondedTo.entrySet()) {
                NatsRequestCompletableFuture future = entry.getValue();
                if (!future.hasExceededTimeout()) continue;
                toRemove.add(entry.getKey());
                future.cancelTimedOut();
            }
            for (String token : toRemove) {
                this.responsesRespondedTo.remove(token);
            }
        }
    }

    @Override
    public @Nullable Message request(@NonNull String subject, byte @Nullable [] body, @Nullable Duration timeout) throws InterruptedException {
        return this.requestInternal(subject, null, body, timeout, this.cancelAction, true, this.forceFlushOnRequest);
    }

    @Override
    public @Nullable Message request(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body, @Nullable Duration timeout) throws InterruptedException {
        return this.requestInternal(subject, headers, body, timeout, this.cancelAction, true, this.forceFlushOnRequest);
    }

    @Override
    public @Nullable Message request(@NonNull Message message, @Nullable Duration timeout) throws InterruptedException {
        Validator.validateNotNull(message, "Message");
        return this.requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, this.cancelAction, false, this.forceFlushOnRequest);
    }

    @Nullable Message requestInternal(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] data, @Nullable Duration timeout, @NonNull NatsRequestCompletableFuture.CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) throws InterruptedException {
        CompletableFuture<Message> incoming = this.requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
        try {
            if (timeout == null) {
                timeout = this.getOptions().getConnectionTimeout();
            }
            return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
        }
        catch (CancellationException | ExecutionException | TimeoutException e) {
            return null;
        }
    }

    @Override
    public @NonNull CompletableFuture<Message> request(@NonNull String subject, byte @Nullable [] body) {
        return this.requestFutureInternal(subject, null, body, null, this.cancelAction, true, this.forceFlushOnRequest);
    }

    @Override
    public @NonNull CompletableFuture<Message> request(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body) {
        return this.requestFutureInternal(subject, headers, body, null, this.cancelAction, true, this.forceFlushOnRequest);
    }

    @Override
    public @NonNull CompletableFuture<Message> requestWithTimeout(@NonNull String subject, byte @Nullable [] body, @Nullable Duration timeout) {
        return this.requestFutureInternal(subject, null, body, timeout, this.cancelAction, true, this.forceFlushOnRequest);
    }

    @Override
    public @NonNull CompletableFuture<Message> requestWithTimeout(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body, Duration timeout) {
        return this.requestFutureInternal(subject, headers, body, timeout, this.cancelAction, true, this.forceFlushOnRequest);
    }

    @Override
    public @NonNull CompletableFuture<Message> requestWithTimeout(@NonNull Message message, @Nullable Duration timeout) {
        Validator.validateNotNull(message, "Message");
        return this.requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, this.cancelAction, false, this.forceFlushOnRequest);
    }

    @Override
    public @NonNull CompletableFuture<Message> request(@NonNull Message message) {
        Validator.validateNotNull(message, "Message");
        return this.requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, this.cancelAction, false, this.forceFlushOnRequest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @NonNull CompletableFuture<Message> requestFutureInternal(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body, @Nullable Duration futureTimeout, @NonNull NatsRequestCompletableFuture.CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
        boolean oldStyle;
        this.checkPayloadSize(body);
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (this.isDraining()) {
            throw new IllegalStateException("Connection is Draining");
        }
        if (this.inboxDispatcher.get() == null) {
            this.inboxDispatcherLock.lock();
            try {
                if (this.inboxDispatcher.get() == null) {
                    NatsDispatcher d = this.dispatcherFactory.createDispatcher(this, this::deliverReply);
                    String id = this.nuid.next();
                    this.dispatchers.put(id, d);
                    d.start(id);
                    d.subscribe(this.mainInbox);
                    this.inboxDispatcher.set(d);
                }
            }
            finally {
                this.inboxDispatcherLock.unlock();
            }
        }
        String responseInbox = (oldStyle = this.options.isOldRequestStyle()) ? this.createInbox() : this.createResponseInbox(this.mainInbox);
        String responseToken = this.getResponseToken(responseInbox);
        NatsRequestCompletableFuture future = new NatsRequestCompletableFuture(cancelAction, futureTimeout == null ? this.options.getRequestCleanupInterval() : futureTimeout, this.options.useTimeoutException());
        if (!oldStyle) {
            this.responsesAwaiting.put(responseToken, future);
        }
        this.statistics.incrementOutstandingRequests();
        if (oldStyle) {
            NatsDispatcher dispatcher = this.inboxDispatcher.get();
            NatsSubscription sub = dispatcher.subscribeReturningSubscription(responseInbox);
            dispatcher.unsubscribe(responseInbox, 1);
            future.whenComplete((msg, exception) -> {
                if (exception instanceof CancellationException) {
                    dispatcher.unsubscribe(responseInbox);
                }
            });
            this.responsesAwaiting.put(sub.getSID(), future);
        }
        this.publishInternal(subject, responseInbox, headers, body, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
        this.statistics.incrementRequestsSent();
        return future;
    }

    void deliverReply(Message msg) {
        boolean oldStyle = this.options.isOldRequestStyle();
        String subject = msg.getSubject();
        String token = this.getResponseToken(subject);
        String key = oldStyle ? msg.getSID() : token;
        NatsRequestCompletableFuture f = this.responsesAwaiting.remove(key);
        if (f != null) {
            if (this.advancedTracking) {
                this.responsesRespondedTo.put(key, f);
            }
            this.statistics.decrementOutstandingRequests();
            if (msg.isStatusMessage() && msg.getStatus().getCode() == 503) {
                switch (f.getCancelAction()) {
                    case COMPLETE: {
                        f.complete(msg);
                        break;
                    }
                    case REPORT: {
                        f.completeExceptionally(new JetStreamStatusException(msg.getStatus()));
                        break;
                    }
                    default: {
                        f.cancel(true);
                        break;
                    }
                }
            } else {
                f.complete(msg);
            }
            this.statistics.incrementRepliesReceived();
        } else if (!oldStyle && !subject.startsWith(this.mainInbox) && this.advancedTracking) {
            if (this.responsesRespondedTo.get(key) != null) {
                this.statistics.incrementDuplicateRepliesReceived();
            } else {
                this.statistics.incrementOrphanRepliesReceived();
            }
        }
    }

    @Override
    public @NonNull Dispatcher createDispatcher() {
        return this.createDispatcher(null);
    }

    @Override
    public @NonNull Dispatcher createDispatcher(@Nullable MessageHandler handler) {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (this.isDraining()) {
            throw new IllegalStateException("Connection is Draining");
        }
        NatsDispatcher dispatcher = this.dispatcherFactory.createDispatcher(this, handler);
        String id = this.nuid.next();
        this.dispatchers.put(id, dispatcher);
        dispatcher.start(id);
        return dispatcher;
    }

    @Override
    public void closeDispatcher(@NonNull Dispatcher d) {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is Closed");
        }
        if (!(d instanceof NatsDispatcher)) {
            throw new IllegalArgumentException("Connection can only manage its own dispatchers");
        }
        NatsDispatcher nd = (NatsDispatcher)d;
        if (nd.isDraining()) {
            return;
        }
        if (!this.dispatchers.containsKey(nd.getId())) {
            throw new IllegalArgumentException("Dispatcher is already closed.");
        }
        this.cleanupDispatcher(nd);
    }

    void cleanupDispatcher(NatsDispatcher nd) {
        nd.stop(true);
        this.dispatchers.remove(nd.getId());
    }

    Map<String, Dispatcher> getDispatchers() {
        return Collections.unmodifiableMap(this.dispatchers);
    }

    @Override
    public void addConnectionListener(@NonNull ConnectionListener connectionListener) {
        this.connectionListeners.add(connectionListener);
    }

    @Override
    public void removeConnectionListener(@NonNull ConnectionListener connectionListener) {
        this.connectionListeners.remove(connectionListener);
    }

    @Override
    public void flush(@Nullable Duration timeout) throws TimeoutException, InterruptedException {
        Instant start = Instant.now();
        this.waitForConnectOrClose(timeout);
        if (this.isClosed()) {
            throw new TimeoutException("Attempted to flush while closed");
        }
        if (timeout == null || timeout.isNegative()) {
            timeout = Duration.ZERO;
        }
        Instant now = Instant.now();
        Duration waitTime = Duration.between(start, now);
        if (!timeout.equals(Duration.ZERO) && waitTime.compareTo(timeout) >= 0) {
            throw new TimeoutException("Timeout out waiting for connection before flush.");
        }
        try {
            CompletableFuture<Boolean> waitForIt = this.sendPing();
            if (waitForIt == null) {
                return;
            }
            long nanos = timeout.toNanos();
            if (nanos > 0L) {
                if ((nanos -= waitTime.toNanos()) <= 0L) {
                    nanos = 1L;
                }
                waitForIt.get(nanos, TimeUnit.NANOSECONDS);
            } else {
                waitForIt.get();
            }
            this.statistics.incrementFlushCounter();
        }
        catch (CancellationException | ExecutionException e) {
            throw new TimeoutException(e.toString());
        }
    }

    void sendConnect(NatsUri nuri) throws IOException {
        try {
            ServerInfo info = this.serverInfo.get();
            CharBuffer connectOptions = this.options.buildProtocolConnectOptionsString(nuri.toString(), true, info.getNonce());
            ByteArrayBuilder bab = new ByteArrayBuilder(NatsConstants.OP_CONNECT_SP_LEN + connectOptions.limit(), StandardCharsets.UTF_8).append(NatsConstants.CONNECT_SP_BYTES).append(connectOptions);
            this.queueInternalOutgoing(new ProtocolMessage(bab, false));
        }
        catch (Exception exp) {
            throw new IOException("Error sending connect string", exp);
        }
    }

    CompletableFuture<Boolean> sendPing() {
        return this.sendPing(true);
    }

    void softPing() {
        this.sendPing(false);
    }

    @Override
    public @NonNull Duration RTT() throws IOException {
        if (!this.isConnected()) {
            throw new IOException("Must be connected to do RTT.");
        }
        long timeout = this.options.getConnectionTimeout().toMillis();
        CompletableFuture pongFuture = new CompletableFuture();
        this.pongQueue.add(pongFuture);
        try {
            long time = NatsSystemClock.nanoTime();
            this.writer.queueInternalMessage(new ProtocolMessage(PING_PROTO));
            pongFuture.get(timeout, TimeUnit.MILLISECONDS);
            return Duration.ofNanos(NatsSystemClock.nanoTime() - time);
        }
        catch (ExecutionException e) {
            throw new IOException(e.getCause());
        }
        catch (TimeoutException e) {
            throw new IOException(e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
    }

    @Nullable CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
        if (!this.isConnectedOrConnecting()) {
            CompletableFuture<Boolean> retVal = new CompletableFuture<Boolean>();
            retVal.complete(Boolean.FALSE);
            return retVal;
        }
        if (!treatAsInternal && !this.needPing.get()) {
            CompletableFuture<Boolean> retVal = new CompletableFuture<Boolean>();
            retVal.complete(Boolean.TRUE);
            this.needPing.set(true);
            return retVal;
        }
        int max = this.options.getMaxPingsOut();
        if (max > 0 && this.pongQueue.size() + 1 > max) {
            this.handleCommunicationIssue(new IllegalStateException("Max outgoing Ping count exceeded."));
            return null;
        }
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<Boolean>();
        this.pongQueue.add(pongFuture);
        if (treatAsInternal) {
            this.queueInternalOutgoing(new ProtocolMessage(PING_PROTO));
        } else {
            this.queueOutgoing(new ProtocolMessage(PING_PROTO));
        }
        this.needPing.set(true);
        this.statistics.incrementPingCount();
        return pongFuture;
    }

    void sendPong() {
        this.queueInternalOutgoing(new ProtocolMessage(PONG_PROTO));
    }

    void handlePong() {
        CompletableFuture<Boolean> pongFuture = this.pongQueue.pollFirst();
        if (pongFuture != null) {
            pongFuture.complete(Boolean.TRUE);
        }
    }

    void readInitialInfo() throws IOException {
        int read;
        byte[] readBuffer = new byte[this.options.getBufferSize()];
        ByteBuffer protocolBuffer = ByteBuffer.allocate(this.options.getBufferSize());
        boolean gotCRLF = false;
        boolean gotCR = false;
        block0: while (!gotCRLF && (read = this.dataPort.read(readBuffer, 0, readBuffer.length)) >= 0) {
            int i = 0;
            while (i < read) {
                byte b = readBuffer[i++];
                if (gotCR) {
                    if (b != 10) {
                        throw new IOException("Missed LF after CR waiting for INFO.");
                    }
                    if (i < read) {
                        throw new IOException("Read past initial info message.");
                    }
                    gotCRLF = true;
                    continue block0;
                }
                if (b == 13) {
                    gotCR = true;
                    continue;
                }
                if (!protocolBuffer.hasRemaining()) {
                    protocolBuffer = this.enlargeBuffer(protocolBuffer);
                }
                protocolBuffer.put(b);
            }
        }
        if (!gotCRLF) {
            throw new IOException("Failed to read initial info message.");
        }
        protocolBuffer.flip();
        String infoJson = StandardCharsets.UTF_8.decode(protocolBuffer).toString();
        infoJson = infoJson.trim();
        String[] msg = infoJson.split("\\s");
        String op = msg[0].toUpperCase();
        if (!"INFO".equals(op)) {
            throw new IOException("Received non-info initial message.");
        }
        this.handleInfo(infoJson);
    }

    void handleInfo(String infoJson) {
        ServerInfo serverInfo = new ServerInfo(infoJson);
        this.serverInfo.set(serverInfo);
        List<String> urls = this.serverInfo.get().getConnectURLs();
        if (!urls.isEmpty() && this.serverPool.acceptDiscoveredUrls(urls)) {
            this.processConnectionEvent(ConnectionListener.Events.DISCOVERED_SERVERS);
        }
        if (serverInfo.isLameDuckMode()) {
            this.processConnectionEvent(ConnectionListener.Events.LAME_DUCK);
        }
    }

    void queueOutgoing(NatsMessage msg) {
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
            throw new IllegalArgumentException("Control line is too long");
        }
        if (!this.writer.queue(msg)) {
            this.options.getErrorListener().messageDiscarded(this, msg);
        }
    }

    void queueInternalOutgoing(NatsMessage msg) {
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
            throw new IllegalArgumentException("Control line is too long");
        }
        this.writer.queueInternalMessage(msg);
    }

    void deliverMessage(NatsMessage msg) {
        this.needPing.set(false);
        this.statistics.incrementInMsgs();
        this.statistics.incrementInBytes(msg.getSizeInBytes());
        NatsSubscription sub = this.subscribers.get(msg.getSID());
        if (sub != null) {
            MessageQueue q;
            msg.setSubscription(sub);
            NatsDispatcher d = sub.getNatsDispatcher();
            NatsConsumer c = d == null ? sub : d;
            MessageQueue messageQueue = q = d == null ? sub.getMessageQueue() : d.getMessageQueue();
            if (c.hasReachedPendingLimits()) {
                this.statistics.incrementDroppedCount();
                c.incrementDroppedCount();
                if (!c.isMarkedSlow()) {
                    c.markSlow();
                    this.processSlowConsumer(c);
                }
            } else if (q != null) {
                c.markNotSlow();
                if (sub.getBeforeQueueProcessor().apply(msg).booleanValue()) {
                    q.push(msg);
                }
            }
        }
    }

    void processOK() {
        this.statistics.incrementOkCount();
    }

    void processSlowConsumer(Consumer consumer) {
        if (!this.callbackRunner.isShutdown()) {
            try {
                this.callbackRunner.execute(() -> {
                    try {
                        this.options.getErrorListener().slowConsumerDetected(this, consumer);
                    }
                    catch (Exception ex) {
                        this.statistics.incrementExceptionCount();
                    }
                });
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    void processException(Exception exp) {
        this.statistics.incrementExceptionCount();
        if (!this.callbackRunner.isShutdown()) {
            try {
                this.callbackRunner.execute(() -> {
                    try {
                        this.options.getErrorListener().exceptionOccurred(this, exp);
                    }
                    catch (Exception ex) {
                        this.statistics.incrementExceptionCount();
                    }
                });
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    void processError(String errorText) {
        this.statistics.incrementErrCount();
        this.lastError.set(errorText);
        this.connectError.set(errorText);
        if (this.isConnected() && this.isAuthenticationError(errorText) && this.currentServer != null) {
            this.serverAuthErrors.put(this.currentServer, errorText);
        }
        if (!this.callbackRunner.isShutdown()) {
            try {
                this.callbackRunner.execute(() -> {
                    try {
                        this.options.getErrorListener().errorOccurred(this, errorText);
                    }
                    catch (Exception ex) {
                        this.statistics.incrementExceptionCount();
                    }
                });
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    void executeCallback(ErrorListenerCaller elc) {
        if (!this.callbackRunner.isShutdown()) {
            try {
                this.callbackRunner.execute(() -> elc.call(this, this.options.getErrorListener()));
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    void processConnectionEvent(ConnectionListener.Events type) {
        if (!this.callbackRunner.isShutdown()) {
            try {
                for (ConnectionListener listener : this.connectionListeners) {
                    this.callbackRunner.execute(() -> {
                        try {
                            listener.connectionEvent(this, type);
                        }
                        catch (Exception ex) {
                            this.statistics.incrementExceptionCount();
                        }
                    });
                }
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
    }

    @Override
    public @NonNull ServerInfo getServerInfo() {
        return this.serverInfo.get();
    }

    @Override
    public @Nullable InetAddress getClientInetAddress() {
        try {
            ServerInfo si = this.getServerInfo();
            return si == ServerInfo.EMPTY_INFO ? null : NatsInetAddress.getByName(si.getClientIp());
        }
        catch (Exception e) {
            return null;
        }
    }

    @Override
    public @NonNull Options getOptions() {
        return this.options;
    }

    @Override
    public @NonNull Statistics getStatistics() {
        return this.statistics;
    }

    StatisticsCollector getStatisticsCollector() {
        return this.statistics;
    }

    DataPort getDataPort() {
        return this.dataPort;
    }

    int getConsumerCount() {
        return this.subscribers.size() + this.dispatchers.size();
    }

    @Override
    public long getMaxPayload() {
        ServerInfo info = this.serverInfo.get();
        if (info == null) {
            return -1L;
        }
        return info.getMaxPayload();
    }

    @Override
    public @NonNull Collection<String> getServers() {
        return this.serverPool.getServerList();
    }

    protected List<NatsUri> resolveHost(NatsUri nuri) {
        List<String> ips;
        ArrayList<NatsUri> results = new ArrayList<NatsUri>();
        if (!(nuri.hostIsIpAddress() || nuri.isWebsocket() || this.options.isEnableFastFallback() || (ips = this.serverPool.resolveHostToIps(nuri.getHost())) == null)) {
            for (String ip : ips) {
                try {
                    results.add(nuri.reHost(ip));
                }
                catch (URISyntaxException uRISyntaxException) {}
            }
        }
        if (results.isEmpty()) {
            results.add(nuri);
        }
        return results;
    }

    @Override
    public @Nullable String getConnectedUrl() {
        return this.currentServer == null ? null : this.currentServer.toString();
    }

    @Override
    public @NonNull Connection.Status getStatus() {
        return this.status;
    }

    @Override
    public @Nullable String getLastError() {
        return this.lastError.get();
    }

    @Override
    public void clearLastError() {
        this.lastError.set(null);
    }

    ExecutorService getExecutor() {
        return this.executor;
    }

    ScheduledExecutorService getScheduledExecutor() {
        return this.scheduledExecutor;
    }

    void updateStatus(Connection.Status newStatus) {
        Connection.Status oldStatus = this.status;
        this.statusLock.lock();
        try {
            if (oldStatus == Connection.Status.CLOSED || newStatus == oldStatus) {
                return;
            }
            this.status = newStatus;
        }
        finally {
            this.statusChanged.signalAll();
            this.statusLock.unlock();
        }
        if (this.status == Connection.Status.DISCONNECTED) {
            this.processConnectionEvent(ConnectionListener.Events.DISCONNECTED);
        } else if (this.status == Connection.Status.CLOSED) {
            this.processConnectionEvent(ConnectionListener.Events.CLOSED);
        } else if (oldStatus == Connection.Status.RECONNECTING && this.status == Connection.Status.CONNECTED) {
            this.processConnectionEvent(ConnectionListener.Events.RECONNECTED);
        } else if (this.status == Connection.Status.CONNECTED) {
            this.processConnectionEvent(ConnectionListener.Events.CONNECTED);
        }
    }

    boolean isClosing() {
        return this.closing;
    }

    boolean isClosed() {
        return this.status == Connection.Status.CLOSED;
    }

    boolean isConnected() {
        return this.status == Connection.Status.CONNECTED;
    }

    boolean isDisconnected() {
        return this.status == Connection.Status.DISCONNECTED;
    }

    boolean isConnectedOrConnecting() {
        this.statusLock.lock();
        try {
            boolean bl = this.status == Connection.Status.CONNECTED || this.connecting;
            return bl;
        }
        finally {
            this.statusLock.unlock();
        }
    }

    boolean isDisconnectingOrClosed() {
        this.statusLock.lock();
        try {
            boolean bl = this.status == Connection.Status.CLOSED || this.disconnecting;
            return bl;
        }
        finally {
            this.statusLock.unlock();
        }
    }

    boolean isDisconnecting() {
        this.statusLock.lock();
        try {
            boolean bl = this.disconnecting;
            return bl;
        }
        finally {
            this.statusLock.unlock();
        }
    }

    void waitForDisconnectOrClose(Duration timeout) throws InterruptedException {
        this.waitWhile(timeout, Void2 -> this.isDisconnecting() && !this.isClosed());
    }

    void waitForConnectOrClose(Duration timeout) throws InterruptedException {
        this.waitWhile(timeout, Void2 -> !this.isConnected() && !this.isClosed());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void waitWhile(Duration timeout, Predicate<Void> waitWhileTrue) throws InterruptedException {
        this.statusLock.lock();
        try {
            long currentWaitNanos = timeout != null ? timeout.toNanos() : -1L;
            long start = NatsSystemClock.nanoTime();
            while (currentWaitNanos >= 0L && waitWhileTrue.test(null)) {
                if (currentWaitNanos > 0L) {
                    if (this.statusChanged.await(currentWaitNanos, TimeUnit.NANOSECONDS) && !waitWhileTrue.test(null)) {
                    } else {
                        long now = NatsSystemClock.nanoTime();
                        if ((currentWaitNanos -= now - (start = now)) > 0L) continue;
                    }
                    break;
                }
                this.statusChanged.await();
            }
        }
        finally {
            this.statusLock.unlock();
        }
    }

    void invokeReconnectDelayHandler(long totalRounds) {
        long currentWaitNanos = 0L;
        ReconnectDelayHandler handler = this.options.getReconnectDelayHandler();
        if (handler == null) {
            Duration dur = this.options.getReconnectWait();
            if (dur != null) {
                currentWaitNanos = dur.toNanos();
                Duration duration = dur = this.serverPool.hasSecureServer() ? this.options.getReconnectJitterTls() : this.options.getReconnectJitter();
                if (dur != null) {
                    currentWaitNanos += ThreadLocalRandom.current().nextLong(dur.toNanos());
                }
            }
        } else {
            Duration waitTime = handler.getWaitTime(totalRounds);
            if (waitTime != null) {
                currentWaitNanos = waitTime.toNanos();
            }
        }
        this.reconnectWaiter = new CompletableFuture();
        long start = NatsSystemClock.nanoTime();
        while (!(currentWaitNanos <= 0L || this.isDisconnectingOrClosed() || this.isConnected() || this.reconnectWaiter.isDone())) {
            try {
                this.reconnectWaiter.get(currentWaitNanos, TimeUnit.NANOSECONDS);
            }
            catch (Exception exception) {
                // empty catch block
            }
            long now = NatsSystemClock.nanoTime();
            currentWaitNanos -= now - start;
            start = now;
        }
        this.reconnectWaiter.complete(Boolean.TRUE);
    }

    ByteBuffer enlargeBuffer(ByteBuffer buffer) {
        int current = buffer.capacity();
        int newSize = current * 2;
        ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
        buffer.flip();
        newBuffer.put(buffer);
        return newBuffer;
    }

    NatsConnectionReader getReader() {
        return this.reader;
    }

    NatsConnectionWriter getWriter() {
        return this.writer;
    }

    Future<DataPort> getDataPortFuture() {
        return this.dataPortFuture;
    }

    boolean isDraining() {
        return this.draining.get() != null;
    }

    boolean isDrained() {
        CompletableFuture<Boolean> tracker = this.draining.get();
        try {
            if (tracker != null && tracker.getNow(false).booleanValue()) {
                return true;
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        return false;
    }

    @Override
    public @NonNull CompletableFuture<Boolean> drain(@Nullable Duration timeout) throws TimeoutException, InterruptedException {
        if (this.isClosing() || this.isClosed()) {
            throw new IllegalStateException("A connection can't be drained during close.");
        }
        this.statusLock.lock();
        try {
            if (this.isDraining()) {
                CompletableFuture<Boolean> completableFuture = this.draining.get();
                return completableFuture;
            }
            this.draining.set(new CompletableFuture());
        }
        finally {
            this.statusLock.unlock();
        }
        CompletableFuture<Boolean> tracker = this.draining.get();
        Instant start = Instant.now();
        HashSet<NatsSubscription> pureSubscribers = new HashSet<NatsSubscription>(this.subscribers.values());
        pureSubscribers.removeIf(s -> s.getDispatcher() != null);
        HashSet<NatsConsumer> consumers = new HashSet<NatsConsumer>();
        consumers.addAll(pureSubscribers);
        consumers.addAll(this.dispatchers.values());
        NatsDispatcher inboxer = this.inboxDispatcher.get();
        if (inboxer != null) {
            consumers.add(inboxer);
        }
        consumers.forEach(cons -> {
            cons.markDraining(tracker);
            cons.sendUnsubForDrain();
        });
        try {
            this.flush(timeout);
        }
        catch (Exception e) {
            this.close(false, false);
            throw e;
        }
        consumers.forEach(NatsConsumer::markUnsubedForDrain);
        this.executor.submit(() -> {
            try {
                long timeoutNanos = timeout == null || timeout.toNanos() <= 0L ? Long.MAX_VALUE : timeout.toNanos();
                long startTime = System.nanoTime();
                while (NatsSystemClock.nanoTime() - startTime < timeoutNanos && !Thread.interrupted()) {
                    consumers.removeIf(NatsConsumer::isDrained);
                    if (consumers.isEmpty()) break;
                    Thread.sleep(1L);
                }
                this.blockPublishForDrain.set(true);
                if (timeout == null || timeout.equals(Duration.ZERO)) {
                    this.flush(Duration.ZERO);
                } else {
                    Instant now = Instant.now();
                    Duration passed = Duration.between(start, now);
                    Duration newTimeout = timeout.minus(passed);
                    if (newTimeout.toNanos() > 0L) {
                        this.flush(newTimeout);
                    }
                }
                this.close(false, false);
                tracker.complete(consumers.isEmpty());
            }
            catch (TimeoutException e) {
                this.processException(e);
            }
            catch (InterruptedException e) {
                this.processException(e);
                Thread.currentThread().interrupt();
            }
            finally {
                try {
                    this.close(false, false);
                }
                catch (InterruptedException e) {
                    this.processException(e);
                    Thread.currentThread().interrupt();
                }
                tracker.complete(false);
            }
        });
        return tracker;
    }

    boolean isAuthenticationError(String err) {
        if (err == null) {
            return false;
        }
        return (err = err.toLowerCase()).startsWith("user authentication") || err.contains("authorization violation") || err.startsWith("account authentication expired");
    }

    @Override
    public void flushBuffer() throws IOException {
        if (!this.isConnected()) {
            throw new IllegalStateException("Connection is not active.");
        }
        this.writer.flushBuffer();
    }

    @Override
    public @NonNull StreamContext getStreamContext(@NonNull String streamName) throws IOException, JetStreamApiException {
        Validator.validateStreamName(streamName, true);
        this.ensureNotClosing();
        return new NatsStreamContext(streamName, null, this, null);
    }

    @Override
    public @NonNull StreamContext getStreamContext(@NonNull String streamName, @Nullable JetStreamOptions options) throws IOException, JetStreamApiException {
        Validator.validateStreamName(streamName, true);
        this.ensureNotClosing();
        return new NatsStreamContext(streamName, null, this, options);
    }

    @Override
    public @NonNull ConsumerContext getConsumerContext(@NonNull String streamName, @NonNull String consumerName) throws IOException, JetStreamApiException {
        return this.getStreamContext(streamName).getConsumerContext(consumerName);
    }

    @Override
    public @NonNull ConsumerContext getConsumerContext(@NonNull String streamName, @NonNull String consumerName, @Nullable JetStreamOptions options) throws IOException, JetStreamApiException {
        return this.getStreamContext(streamName, options).getConsumerContext(consumerName);
    }

    @Override
    public @NonNull JetStream jetStream() throws IOException {
        return this.jetStream(null);
    }

    @Override
    public @NonNull JetStream jetStream(JetStreamOptions options) throws IOException {
        this.ensureNotClosing();
        return new NatsJetStream(this, options);
    }

    @Override
    public @NonNull JetStreamManagement jetStreamManagement() throws IOException {
        return this.jetStreamManagement(null);
    }

    @Override
    public @NonNull JetStreamManagement jetStreamManagement(JetStreamOptions options) throws IOException {
        this.ensureNotClosing();
        return new NatsJetStreamManagement(this, options);
    }

    @Override
    public @NonNull KeyValue keyValue(@NonNull String bucketName) throws IOException {
        return this.keyValue(bucketName, null);
    }

    @Override
    public @NonNull KeyValue keyValue(@NonNull String bucketName, @Nullable KeyValueOptions options) throws IOException {
        Validator.validateBucketName(bucketName, true);
        this.ensureNotClosing();
        return new NatsKeyValue(this, bucketName, options);
    }

    @Override
    public @NonNull KeyValueManagement keyValueManagement() throws IOException {
        return this.keyValueManagement(null);
    }

    @Override
    public @NonNull KeyValueManagement keyValueManagement(@Nullable KeyValueOptions options) throws IOException {
        this.ensureNotClosing();
        return new NatsKeyValueManagement(this, options);
    }

    @Override
    public @NonNull ObjectStore objectStore(@NonNull String bucketName) throws IOException {
        return this.objectStore(bucketName, null);
    }

    @Override
    public @NonNull ObjectStore objectStore(@NonNull String bucketName, @Nullable ObjectStoreOptions options) throws IOException {
        Validator.validateBucketName(bucketName, true);
        this.ensureNotClosing();
        return new NatsObjectStore(this, bucketName, options);
    }

    @Override
    public @NonNull ObjectStoreManagement objectStoreManagement() throws IOException {
        this.ensureNotClosing();
        return new NatsObjectStoreManagement(this, null);
    }

    @Override
    public @NonNull ObjectStoreManagement objectStoreManagement(@Nullable ObjectStoreOptions options) throws IOException {
        this.ensureNotClosing();
        return new NatsObjectStoreManagement(this, options);
    }

    private void ensureNotClosing() throws IOException {
        if (this.isClosing() || this.isClosed()) {
            throw new IOException("A JetStream context can't be established during close.");
        }
    }

    @Override
    public long outgoingPendingMessageCount() {
        return this.writer.outgoingPendingMessageCount();
    }

    @Override
    public long outgoingPendingBytes() {
        return this.writer.outgoingPendingBytes();
    }

    static interface ErrorListenerCaller {
        public void call(Connection var1, ErrorListener var2);
    }
}

