/*
 * Decompiled with CFR 0.152.
 */
package io.nats.streaming;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.ErrorListener;
import io.nats.client.NUID;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.streaming.AckHandler;
import io.nats.streaming.ConnectionLostHandler;
import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionImpl;
import io.nats.streaming.SubscriptionOptions;
import io.nats.streaming.protobuf.Ack;
import io.nats.streaming.protobuf.CloseRequest;
import io.nats.streaming.protobuf.CloseResponse;
import io.nats.streaming.protobuf.ConnectRequest;
import io.nats.streaming.protobuf.ConnectResponse;
import io.nats.streaming.protobuf.MsgProto;
import io.nats.streaming.protobuf.Ping;
import io.nats.streaming.protobuf.PingResponse;
import io.nats.streaming.protobuf.PubAck;
import io.nats.streaming.protobuf.PubMsg;
import io.nats.streaming.protobuf.SubscriptionRequest;
import io.nats.streaming.protobuf.SubscriptionResponse;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class StreamingConnectionImpl
implements StreamingConnection,
io.nats.client.MessageHandler {
    static final String ERR_MANUAL_ACK = "stan: cannot manually ack in auto-ack mode";
    static final String INBOX_PREFIX = "_INBOX.";
    private final ReadWriteLock mu = new ReentrantReadWriteLock();
    private String clientId;
    private String clusterId;
    private String connectionId;
    String pubPrefix;
    String subRequests;
    String unsubRequests;
    String subCloseRequests;
    String closeRequests;
    String ackSubject;
    String hbSubject;
    String pingInbox;
    Duration pingInterval;
    int pingMaxOut;
    byte[] pingBytes;
    String pingRequests;
    int pingsOut;
    Timer pingTimer;
    Map<String, Subscription> subMap;
    Map<String, AckClosure> pubAckMap;
    private BlockingQueue<PubAck> pubAckChan;
    Options opts;
    Connection nc;
    Dispatcher ackDispatcher;
    Dispatcher messageDispatcher;
    Map<String, Dispatcher> customDispatchers;
    Dispatcher dispatcher;
    io.nats.client.Subscription pingSub;
    io.nats.client.Subscription hbSub;
    NUID nuid;
    final Timer ackTimer = new Timer("jnats-streaming ack timeout thread", true);
    boolean ncOwned = false;

    StreamingConnectionImpl(String clusterId, String clientId, Options opts) {
        this.clusterId = clusterId;
        this.clientId = clientId;
        this.nuid = new NUID();
        this.opts = opts;
        this.connectionId = this.nuid.next();
        if (opts == null) {
            opts = new Options.Builder().build();
        }
        if (this.opts.getNatsConn() != null) {
            this.setNatsConnection(this.opts.getNatsConn());
        }
    }

    StreamingConnectionImpl(Options opts) {
        this(opts.getClusterId(), opts.getClientId(), opts);
    }

    void timeTrace(boolean trace, String format, Object ... args) {
        if (trace) {
            String timeStr = DateTimeFormatter.ISO_TIME.format(LocalDateTime.now());
            System.out.printf("[%s] connect trace: ", timeStr);
            System.out.printf(format, args);
            System.out.println();
        }
    }

    StreamingConnectionImpl connect() throws IOException, InterruptedException {
        boolean exThrown = false;
        boolean trace = this.opts.isTraceConnection();
        this.timeTrace(trace, "starting connection to streaming cluster %s as %s", this.clusterId, this.clientId);
        Connection nc = this.getNatsConnection();
        if (nc == null) {
            nc = this.createNatsConnection();
            this.setNatsConnection(nc);
            this.ncOwned = true;
        } else {
            if (nc.getStatus() != Connection.Status.CONNECTED) {
                throw new IOException("stan: invalid connection");
            }
            this.timeTrace(trace, "skipped NATS connection, using existing one", new Object[0]);
        }
        try {
            this.timeTrace(trace, "creating inboxes", new Object[0]);
            this.hbSubject = this.newInbox();
            this.pingInbox = this.newInbox();
            this.ackSubject = String.format("%s.%s", "_STAN.acks", this.nuid.next());
            this.timeTrace(trace, "creating ack dispatcher", new Object[0]);
            this.ackDispatcher = nc.createDispatcher(msg -> this.processAck(msg));
            this.ackDispatcher.subscribe(this.ackSubject);
            this.timeTrace(trace, "creating hb/ping dispatcher and subscribing", new Object[0]);
            this.dispatcher = nc.createDispatcher(msg -> {});
            this.hbSub = this.dispatcher.subscribe(this.hbSubject, msg -> this.processHeartBeat(msg));
            this.pingSub = this.dispatcher.subscribe(this.pingInbox, msg -> this.processPing(msg));
            this.timeTrace(trace, "setting pending limits on dispatchers", new Object[0]);
            this.dispatcher.setPendingLimits(-1L, -1L);
            this.ackDispatcher.setPendingLimits(-1L, -1L);
            this.customDispatchers = new HashMap<String, Dispatcher>();
            String discoverSubject = String.format("%s.%s", this.opts.getDiscoverPrefix(), this.clusterId);
            long pingInterval = this.opts.getPingInterval().toMillis();
            pingInterval = pingInterval < 1000L ? -pingInterval : (pingInterval /= 1000L);
            this.timeTrace(trace, "sending connection request", new Object[0]);
            ConnectRequest req = ConnectRequest.newBuilder().setClientID(this.clientId).setConnID(ByteString.copyFromUtf8((String)this.connectionId)).setHeartbeatInbox(this.hbSubject).setProtocol(1).setPingInterval((int)pingInterval).setPingMaxOut(this.opts.getMaxPingsOut()).build();
            byte[] bytes = req.toByteArray();
            io.nats.client.Message reply = nc.request(discoverSubject, bytes, this.opts.getConnectTimeout());
            if (reply == null) {
                throw new IOException("stan: connect request timeout");
            }
            this.timeTrace(trace, "received connection request", new Object[0]);
            ConnectResponse cr = ConnectResponse.parseFrom(reply.getData());
            if (!cr.getError().isEmpty()) {
                throw new IOException(cr.getError());
            }
            this.pubPrefix = cr.getPubPrefix();
            this.subRequests = cr.getSubRequests();
            this.unsubRequests = cr.getUnsubRequests();
            this.subCloseRequests = cr.getSubCloseRequests();
            this.closeRequests = cr.getCloseRequests();
            boolean unsubPings = true;
            if (cr.getProtocol() >= 1) {
                this.timeTrace(trace, "setting up server ping", new Object[0]);
                if (cr.getPingInterval() != 0) {
                    unsubPings = false;
                    this.pingRequests = cr.getPingRequests();
                    this.pingInterval = cr.getPingInterval() < 0 ? Duration.ofMillis(-cr.getPingInterval()) : Duration.ofSeconds(cr.getPingInterval());
                    this.pingMaxOut = cr.getPingMaxOut();
                    this.pingBytes = Ping.newBuilder().setConnID(ByteString.copyFromUtf8((String)this.connectionId)).build().toByteArray();
                    this.pingTimer = new Timer("jnats streaming ping timer", true);
                    this.pingTimer.schedule(new TimerTask(){

                        @Override
                        public void run() {
                            try {
                                StreamingConnectionImpl.this.pingServer();
                            }
                            catch (Exception e) {
                                this.cancel();
                            }
                        }
                    }, this.pingInterval.toMillis(), this.pingInterval.toMillis());
                }
            }
            if (unsubPings) {
                this.timeTrace(trace, "removing ping subscriber, not supported by server", new Object[0]);
                this.dispatcher.unsubscribe(this.pingSub);
                this.pingSub = null;
            }
            this.pubAckMap = new HashMap<String, AckClosure>();
            this.subMap = new HashMap<String, Subscription>();
            this.pubAckChan = new LinkedBlockingQueue<PubAck>(this.opts.getMaxPubAcksInFlight());
        }
        catch (IOException e) {
            exThrown = true;
            throw e;
        }
        finally {
            if (exThrown) {
                try {
                    this.close();
                }
                catch (Exception exception) {}
            } else {
                this.timeTrace(trace, "connection complete", new Object[0]);
            }
        }
        return this;
    }

    Connection createNatsConnection() throws IOException, InterruptedException {
        Connection nc = null;
        if (this.getNatsConnection() == null) {
            if (this.opts.getNatsUrl() != null) {
                Options.Builder natsOpts = new Options.Builder().connectionName(this.clientId).errorListener(this.opts.getErrorListener()).connectionListener(this.opts.getConnectionListener()).server(this.opts.getNatsUrl());
                if (this.opts.isTraceConnection()) {
                    natsOpts.traceConnection();
                }
                nc = Nats.connect((io.nats.client.Options)natsOpts.build());
            } else {
                nc = Nats.connect();
            }
            this.ncOwned = true;
        }
        return nc;
    }

    @Override
    public void close() throws IOException, InterruptedException {
        this.close(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close(boolean silent) throws IOException, InterruptedException {
        this.lock();
        try {
            if (this.getNatsConnection() == null) {
                return;
            }
            Connection nc = this.getNatsConnection();
            try {
                this.setNatsConnection(null);
                if (this.pingTimer != null) {
                    this.pingTimer.cancel();
                }
                if (this.pubAckMap != null) {
                    for (AckClosure ac : this.pubAckMap.values()) {
                        ac.ackTask.cancel();
                        if (ac.ch.isEmpty()) continue;
                        ac.ch.take();
                    }
                }
                this.ackTimer.cancel();
                if (this.messageDispatcher != null && this.messageDispatcher.isActive()) {
                    nc.closeDispatcher(this.messageDispatcher);
                }
                for (Dispatcher d : this.customDispatchers.values()) {
                    if (!d.isActive()) continue;
                    nc.closeDispatcher(d);
                }
                if (this.ackDispatcher != null && this.ackDispatcher.isActive()) {
                    nc.closeDispatcher(this.ackDispatcher);
                }
                if (this.dispatcher != null && this.dispatcher.isActive()) {
                    nc.closeDispatcher(this.dispatcher);
                }
                if (!silent) {
                    CloseResponse cr;
                    CloseRequest req = CloseRequest.newBuilder().setClientID(this.clientId).build();
                    byte[] bytes = req.toByteArray();
                    io.nats.client.Message reply = nc.request(this.closeRequests, bytes, this.opts.getConnectTimeout());
                    if (reply == null) {
                        throw new IOException("stan: close request timeout");
                    }
                    if (reply.getData() != null && !(cr = CloseResponse.parseFrom(reply.getData())).getError().isEmpty()) {
                        throw new IOException(cr.getError());
                    }
                }
            }
            finally {
                if (this.ncOwned) {
                    try {
                        nc.close();
                    }
                    catch (Exception exception) {}
                }
            }
        }
        finally {
            this.unlock();
        }
    }

    private SubscriptionImpl createSubscription(String subject, String qgroup, MessageHandler cb, StreamingConnectionImpl conn, SubscriptionOptions opts) {
        return new SubscriptionImpl(subject, qgroup, cb, conn, opts);
    }

    void processHeartBeat(io.nats.client.Message msg) {
        this.rLock();
        Connection nc = this.nc;
        this.rUnlock();
        if (nc != null) {
            nc.publish(msg.getReplyTo(), null);
        }
    }

    BlockingQueue<String> createErrorChannel() {
        return new LinkedBlockingQueue<String>();
    }

    @Override
    public void publish(String subject, byte[] data) throws IOException, InterruptedException, TimeoutException {
        BlockingQueue<String> ch = this.createErrorChannel();
        Duration ackTimeout = this.opts.getAckTimeout();
        this.publish(subject, data, null, ch);
        String err = ch.poll(2L * ackTimeout.toMillis(), TimeUnit.MILLISECONDS);
        if (err == null) {
            throw new TimeoutException("stan: publish ack timeout");
        }
        if (!err.isEmpty()) {
            throw new IOException(err);
        }
    }

    @Override
    public String publish(String subject, byte[] data, AckHandler ah) throws IOException, InterruptedException, TimeoutException {
        return this.publish(subject, data, ah, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String publish(String subject, byte[] data, AckHandler ah, BlockingQueue<String> ch) throws IOException, InterruptedException, TimeoutException {
        BlockingQueue<PubAck> pac;
        String ackSubject;
        byte[] bytes;
        String guid;
        String subj;
        Duration ackTimeout = this.opts.getAckTimeout();
        AckClosure a = new AckClosure(ah, subject, ah != null && ah.includeDataWithAck() ? data : null, ch);
        this.lock();
        try {
            if (this.getNatsConnection() == null) {
                throw new IllegalStateException("stan: connection closed");
            }
            subj = this.pubPrefix + "." + subject;
            guid = NUID.nextGlobal();
            PubMsg.Builder pb = PubMsg.newBuilder().setClientID(this.clientId).setGuid(guid).setSubject(subject);
            if (data != null) {
                pb = pb.setData(ByteString.copyFrom((byte[])data));
            }
            PubMsg pe = pb.build();
            bytes = pe.toByteArray();
            this.pubAckMap.put(guid, a);
            ackSubject = this.ackSubject;
            pac = this.pubAckChan;
        }
        finally {
            this.unlock();
        }
        try {
            pac.put(PubAck.getDefaultInstance());
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.nc.publish(subj, ackSubject, bytes);
        this.lock();
        try {
            a.ackTask = this.createAckTimerTask(guid);
            this.ackTimer.schedule(a.ackTask, ackTimeout.toMillis());
        }
        finally {
            this.unlock();
        }
        return guid;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Dispatcher getDispatcherByName(String name) {
        Dispatcher d = null;
        this.lock();
        try {
            if (name == null || name.isEmpty()) {
                if (this.messageDispatcher == null) {
                    this.messageDispatcher = this.nc.createDispatcher(msg -> this.processMsg(msg));
                    this.messageDispatcher.setPendingLimits(-1L, -1L);
                }
                Dispatcher dispatcher = this.messageDispatcher;
                return dispatcher;
            }
            d = this.customDispatchers.get(name);
            if (d == null) {
                d = this.getNatsConnection().createDispatcher(msg -> this.processMsg(msg));
                d.setPendingLimits(-1L, -1L);
                this.customDispatchers.put(name, d);
            }
        }
        finally {
            this.unlock();
        }
        return d;
    }

    @Override
    public Subscription subscribe(String subject, MessageHandler cb) throws IOException, InterruptedException, TimeoutException {
        return this.subscribe(subject, cb, null);
    }

    @Override
    public Subscription subscribe(String subject, MessageHandler cb, SubscriptionOptions opts) throws IOException, InterruptedException, TimeoutException {
        return this.subscribe(subject, null, cb, opts);
    }

    @Override
    public Subscription subscribe(String subject, String queue, MessageHandler cb) throws IOException, InterruptedException, TimeoutException {
        return this.subscribe(subject, queue, cb, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Subscription subscribe(String subject, String queue, MessageHandler cb, SubscriptionOptions opts) throws IOException, InterruptedException, TimeoutException {
        Connection nc;
        SubscriptionImpl sub;
        if (opts == null) {
            opts = new SubscriptionOptions.Builder().build();
        }
        this.lock();
        try {
            if (this.getNatsConnection() == null) {
                throw new IllegalStateException("stan: connection closed");
            }
            sub = this.createSubscription(subject, queue, cb, this, opts);
            this.subMap.put(sub.inbox, sub);
            nc = this.getNatsConnection();
        }
        finally {
            this.unlock();
        }
        sub.wLock();
        try {
            SubscriptionResponse response;
            Dispatcher d = this.getDispatcherByName(opts.getDispatcherName());
            d.subscribe(sub.inbox);
            SubscriptionRequest sr = this.createSubscriptionRequest(sub);
            io.nats.client.Message reply = nc.request(this.subRequests, sr.toByteArray(), opts.getSubscriptionTimeout());
            if (reply == null) {
                d.unsubscribe(sub.inbox);
                throw new IOException("stan: subscribe request timeout");
            }
            try {
                response = SubscriptionResponse.parseFrom(reply.getData());
            }
            catch (InvalidProtocolBufferException e) {
                d.unsubscribe(sub.inbox);
                throw e;
            }
            if (!response.getError().isEmpty()) {
                d.unsubscribe(sub.inbox);
                throw new IOException(response.getError());
            }
            sub.setAckInbox(response.getAckInbox());
        }
        finally {
            sub.wUnlock();
        }
        return sub;
    }

    SubscriptionRequest createSubscriptionRequest(SubscriptionImpl sub) {
        SubscriptionOptions subOpts = sub.getOptions();
        SubscriptionRequest.Builder srb = SubscriptionRequest.newBuilder();
        String clientId = sub.getConnection().getClientId();
        String queue = sub.getQueue();
        String subject = sub.getSubject();
        srb.setClientID(clientId).setSubject(subject).setQGroup(queue == null ? "" : queue).setInbox(sub.getInbox()).setMaxInFlight(subOpts.getMaxInFlight()).setAckWaitInSecs((int)subOpts.getAckWait().getSeconds());
        switch (subOpts.getStartAt()) {
            case First: {
                break;
            }
            case LastReceived: {
                break;
            }
            case NewOnly: {
                break;
            }
            case SequenceStart: {
                srb.setStartSequence(subOpts.getStartSequence());
                break;
            }
            case TimeDeltaStart: {
                long delta = ChronoUnit.NANOS.between(subOpts.getStartTime(), Instant.now());
                srb.setStartTimeDelta(delta);
                break;
            }
        }
        srb.setStartPosition(subOpts.getStartAt());
        if (subOpts.getDurableName() != null) {
            srb.setDurableName(subOpts.getDurableName());
        }
        return srb.build();
    }

    void processAck(io.nats.client.Message msg) {
        PubAck pa;
        IOException ex = null;
        try {
            pa = PubAck.parseFrom(msg.getData());
        }
        catch (InvalidProtocolBufferException e) {
            System.err.println("Protocol error: " + e.getStackTrace());
            return;
        }
        AckClosure ackClosure = this.removeAck(pa.getGuid());
        if (ackClosure != null) {
            String ackError = pa.getError();
            if (ackClosure.ah != null) {
                if (!ackError.isEmpty()) {
                    ex = new IOException(ackError);
                }
                ackClosure.ah.onAck(pa.getGuid(), ackClosure.subject, ackClosure.data, ex);
                ackClosure.subject = null;
                ackClosure.data = null;
            } else if (ackClosure.ch != null) {
                try {
                    ackClosure.ch.put(ackError);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
    }

    TimerTask createAckTimerTask(final String guid) {
        return new TimerTask(){

            @Override
            public void run() {
                try {
                    StreamingConnectionImpl.this.processAckTimeout(guid);
                }
                catch (Exception e) {
                    this.cancel();
                }
            }
        };
    }

    void processAckTimeout(String guid) {
        AckClosure ackClosure = this.removeAck(guid);
        if (ackClosure == null) {
            return;
        }
        if (ackClosure.ah != null) {
            ackClosure.ah.onAck(guid, ackClosure.subject, ackClosure.data, new TimeoutException("stan: publish ack timeout"));
        } else if (ackClosure.ch != null) {
            try {
                ackClosure.ch.put("stan: publish ack timeout");
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    AckClosure removeAck(String guid) {
        BlockingQueue<PubAck> pac;
        AckClosure ackClosure;
        TimerTask timerTask = null;
        this.lock();
        try {
            ackClosure = this.pubAckMap.get(guid);
            if (ackClosure != null) {
                timerTask = ackClosure.ackTask;
                this.pubAckMap.remove(guid);
            }
            pac = this.pubAckChan;
        }
        finally {
            this.unlock();
        }
        if (timerTask != null) {
            timerTask.cancel();
        }
        if (ackClosure != null && pac.size() > 0) {
            try {
                pac.take();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        return ackClosure;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closeDueToPing(String error) {
        boolean isClosed = false;
        ConnectionLostHandler lost = null;
        Exception ex = null;
        this.lock();
        try {
            isClosed = this.getNatsConnection() == null;
            lost = this.opts.getConnectionLostHandler();
        }
        finally {
            this.unlock();
        }
        if (isClosed) {
            return;
        }
        try {
            this.close(true);
        }
        catch (Exception exp) {
            ex = exp;
        }
        if (lost != null) {
            ex = ex == null ? new Exception(error) : new Exception(error, ex);
            lost.connectionLost(this, ex);
        }
    }

    void pingServer() {
        this.lock();
        try {
            ++this.pingsOut;
            if (this.pingsOut > this.pingMaxOut) {
                this.unlock();
                this.closeDueToPing("stan: connection lost due to PING failure");
                return;
            }
            Connection conn = this.nc;
            this.unlock();
            try {
                conn.publish(this.pingRequests, this.pingInbox, this.pingBytes);
            }
            catch (Exception exp) {
                if (conn.getStatus() == Connection.Status.CLOSED) {
                    this.closeDueToPing(exp.getMessage());
                }
            }
        }
        catch (Exception exp) {
            this.unlock();
            throw exp;
        }
    }

    void processPing(io.nats.client.Message msg) {
        if (msg.getData() != null && msg.getData().length > 0) {
            try {
                PingResponse pingResp = PingResponse.parseFrom(msg.getData());
                String error = pingResp.getError();
                if (error != null && !error.isEmpty()) {
                    this.closeDueToPing(error);
                    return;
                }
            }
            catch (Exception e) {
                return;
            }
        }
        this.lock();
        this.pingsOut = 0;
        this.unlock();
    }

    public void onMessage(io.nats.client.Message msg) {
        this.processMsg(msg);
    }

    Message createStanMessage(MsgProto msgp) {
        return new Message(msgp);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processMsg(io.nats.client.Message raw) {
        boolean isManualAck;
        String ackSubject;
        Connection nc;
        Message stanMsg;
        block17: {
            StreamingConnectionImpl subsc;
            MessageHandler cb;
            SubscriptionImpl sub;
            boolean isClosed;
            stanMsg = null;
            try {
                MsgProto msgp = MsgProto.parseFrom(raw.getData());
                stanMsg = this.createStanMessage(msgp);
            }
            catch (InvalidProtocolBufferException msgp) {
                // empty catch block
            }
            this.lock();
            try {
                nc = this.getNatsConnection();
                isClosed = nc == null;
                sub = (SubscriptionImpl)this.subMap.get(raw.getSubject());
            }
            finally {
                this.unlock();
            }
            if (sub == null || isClosed) {
                return;
            }
            stanMsg.setSubscription(sub);
            sub.rLock();
            try {
                cb = sub.getMessageHandler();
                ackSubject = sub.getAckInbox();
                isManualAck = sub.getOptions().isManualAcks();
                subsc = sub.getConnection();
            }
            finally {
                sub.rUnlock();
            }
            if (cb != null && subsc != null) {
                try {
                    cb.onMessage(stanMsg);
                }
                catch (Exception e) {
                    ErrorListener handler = nc.getOptions().getErrorListener();
                    if (handler == null) break block17;
                    try {
                        handler.exceptionOccurred(this.nc, e);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        }
        if (!isManualAck) {
            Ack ack = Ack.newBuilder().setSubject(stanMsg.getSubject()).setSequence(stanMsg.getSequence()).build();
            nc.publish(ackSubject, ack.toByteArray());
        }
    }

    public String getClientId() {
        return this.clientId;
    }

    @Override
    public Connection getNatsConnection() {
        return this.nc;
    }

    private void setNatsConnection(Connection nc) {
        this.nc = nc;
    }

    public String newInbox() {
        StringBuilder builder = new StringBuilder();
        builder.append(INBOX_PREFIX);
        builder.append(this.nuid.next());
        return builder.toString();
    }

    void lock() {
        this.mu.writeLock().lock();
    }

    void unlock() {
        this.mu.writeLock().unlock();
    }

    private void rLock() {
        this.mu.readLock().lock();
    }

    private void rUnlock() {
        this.mu.readLock().unlock();
    }

    class AckClosure {
        TimerTask ackTask;
        AckHandler ah;
        String subject;
        byte[] data;
        BlockingQueue<String> ch;

        AckClosure(AckHandler ah, String subject, byte[] data, BlockingQueue<String> ch) {
            this.ah = ah;
            this.ch = ch;
            this.subject = subject;
            this.data = data;
        }
    }
}

