/*
 * Decompiled with CFR 0.152.
 */
package io.nats.stan;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.nats.client.AsyncSubscription;
import io.nats.client.ConnectionFactory;
import io.nats.client.NUID;
import io.nats.stan.AckHandler;
import io.nats.stan.Connection;
import io.nats.stan.Message;
import io.nats.stan.MessageHandler;
import io.nats.stan.Options;
import io.nats.stan.Subscription;
import io.nats.stan.SubscriptionImpl;
import io.nats.stan.SubscriptionOptions;
import io.nats.stan.protobuf.Ack;
import io.nats.stan.protobuf.CloseRequest;
import io.nats.stan.protobuf.CloseResponse;
import io.nats.stan.protobuf.ConnectRequest;
import io.nats.stan.protobuf.ConnectResponse;
import io.nats.stan.protobuf.MsgProto;
import io.nats.stan.protobuf.PubAck;
import io.nats.stan.protobuf.PubMsg;
import io.nats.stan.protobuf.SubscriptionRequest;
import io.nats.stan.protobuf.SubscriptionResponse;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConnectionImpl
implements Connection,
io.nats.client.MessageHandler {
    static final String DEFAULT_NATS_URL = "nats://localhost:4222";
    static final int DEFAULT_CONNECT_WAIT = 2;
    static final String DEFAULT_DISCOVER_PREFIX = "_STAN.discover";
    static final String DEFAULT_ACK_PREFIX = "_STAN.acks";
    static final int DEFAULT_MAX_PUB_ACKS_IN_FLIGHT = 12;
    static final String PFX = "stan: ";
    static final String ERR_BAD_ACK = "stan: malformed ack";
    static final String ERR_BAD_CONNECTION = "stan: invalid connection";
    static final String ERR_BAD_SUBSCRIPTION = "stan: invalid subscription";
    static final String ERR_CLOSE_REQ_TIMEOUT = "stan: close request timeout";
    static final String ERR_CONNECTION_CLOSED = "stan: connection closed";
    static final String ERR_CONNECTION_REQ_TIMEOUT = "stan: connect request timeout";
    static final String ERR_MANUAL_ACK = "stan: cannot manually ack in auto-ack mode";
    static final String ERR_NULL_MSG = "stan: null message";
    static final String ERR_TIMEOUT = "stan: publish ack timeout";
    static final String SERVER_ERR_BAD_PUB_MSG = "stan: malformed publish message envelope";
    static final String SERVER_ERR_BAD_SUB_REQUEST = "stan: malformed subscription request";
    static final String SERVER_ERR_INVALID_SUBJECT = "stan: invalid subject";
    static final String SERVER_ERR_INVALID_SEQUENCE = "stan: invalid start sequence";
    static final String SERVER_ERR_INVALID_TIME = "stan: invalid start time";
    static final String SERVER_ERR_INVALID_SUB = "stan: invalid subscription";
    static final String SERVER_ERR_INVALID_CONN_REQ = "stan: invalid connection request";
    static final String SERVER_ERR_INVALID_CLIENT = "stan: clientID already registered";
    static final String SERVER_ERR_INVALID_CLOSE_REQ = "stan: invalid close request";
    static final String SERVER_ERR_INVALID_ACK_WAIT = "stan: invalid ack wait time, should be >= 1s";
    static final String SERVER_ERR_DUP_DURABLE = "stan: duplicate durable registration";
    static final String SERVER_ERR_INVALID_DURABLE_NAME = "stan: durable name of a durable queue subscriber can't contain the character ':'";
    static final String SERVER_ERR_DURABLE_QUEUE = "stan: queue subscribers can't be durable";
    static final String SERVER_ERR_TIMEOUT = "stan: publish ack timeout";
    static final Logger logger = LoggerFactory.getLogger(ConnectionImpl.class);
    final ReadWriteLock mu = new ReentrantReadWriteLock();
    String clientId;
    String clusterId;
    String pubPrefix;
    String subRequests;
    String unsubRequests;
    String closeRequests;
    String ackSubject;
    io.nats.client.Subscription ackSubscription;
    String hbInbox;
    io.nats.client.Subscription hbSubscription;
    io.nats.client.MessageHandler hbCallback;
    Map<String, Subscription> subMap;
    Map<String, AckClosure> pubAckMap;
    BlockingQueue<PubAck> pubAckChan;
    Options opts;
    io.nats.client.Connection nc;
    Timer ackTimer = new Timer(true);
    boolean ncOwned = false;
    ExecutorService exec = Executors.newCachedThreadPool();

    protected ConnectionImpl() {
    }

    ConnectionImpl(String stanClusterId, String clientId) {
        this(stanClusterId, clientId, null);
    }

    ConnectionImpl(String stanClusterId, String clientId, Options opts) {
        this.clusterId = stanClusterId;
        this.clientId = clientId;
        if (opts == null) {
            this.opts = new Options.Builder().create();
        } else {
            this.opts = opts;
            if (this.opts.getNatsConn() != null) {
                this.setNatsConnection(this.opts.getNatsConn());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void connect() throws IOException, TimeoutException {
        boolean exThrown = false;
        io.nats.client.Connection nc = this.getNatsConnection();
        if (nc == null) {
            nc = this.createNatsConnection();
            this.setNatsConnection(nc);
            this.ncOwned = true;
        } else if (!nc.isConnected()) {
            throw new IOException(ERR_BAD_CONNECTION);
        }
        try {
            this.hbInbox = nc.newInbox();
            this.hbCallback = new io.nats.client.MessageHandler(){

                public void onMessage(io.nats.client.Message msg) {
                    ConnectionImpl.this.processHeartBeat(msg);
                }
            };
            this.hbSubscription = nc.subscribe(this.hbInbox, this.hbCallback);
            String discoverSubject = String.format("%s.%s", this.opts.getDiscoverPrefix(), this.clusterId);
            ConnectRequest req = ConnectRequest.newBuilder().setClientID(this.clientId).setHeartbeatInbox(this.hbInbox).build();
            byte[] bytes = req.toByteArray();
            io.nats.client.Message reply = null;
            reply = nc.request(discoverSubject, bytes, this.opts.getConnectTimeout().toMillis());
            ConnectResponse cr = ConnectResponse.parseFrom(reply.getData());
            if (!cr.getError().isEmpty()) {
                throw new IOException(cr.getError());
            }
            logger.trace("Received ConnectResponse:\n{}", (Object)cr);
            this.pubPrefix = cr.getPubPrefix();
            this.subRequests = cr.getSubRequests();
            this.unsubRequests = cr.getUnsubRequests();
            this.closeRequests = cr.getCloseRequests();
            this.ackSubject = String.format("%s.%s", DEFAULT_ACK_PREFIX, NUID.nextGlobal());
            this.ackSubscription = nc.subscribe(this.ackSubject, new io.nats.client.MessageHandler(){

                public void onMessage(io.nats.client.Message msg) {
                    ConnectionImpl.this.processAck(msg);
                }
            });
            this.ackSubscription.setPendingLimits(1026, 32770);
            this.pubAckMap = new HashMap<String, AckClosure>();
            this.subMap = new HashMap<String, Subscription>();
            this.pubAckChan = new LinkedBlockingQueue<PubAck>(this.opts.getMaxPubAcksInFlight());
        }
        catch (IOException e) {
            exThrown = true;
            throw e;
        }
        catch (TimeoutException e) {
            exThrown = true;
            if ("nats: timeout".equals(e.getMessage())) {
                TimeoutException te = new TimeoutException(ERR_CONNECTION_REQ_TIMEOUT);
                te.initCause(e);
                throw te;
            }
        }
        finally {
            if (exThrown) {
                try {
                    this.close();
                }
                catch (Exception exception) {}
            }
        }
    }

    ConnectionFactory createNatsConnectionFactory() {
        ConnectionFactory cf = new ConnectionFactory();
        if (this.opts.getNatsUrl() != null) {
            cf.setUrl(this.opts.getNatsUrl());
        }
        return cf;
    }

    io.nats.client.Connection createNatsConnection() throws IOException, TimeoutException {
        io.nats.client.Connection nc = null;
        if (this.getNatsConnection() == null) {
            nc = this.createNatsConnectionFactory().createConnection();
            this.ncOwned = true;
        }
        return nc;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException, TimeoutException {
        logger.trace("In STAN close()");
        io.nats.client.Connection nc = null;
        this.lock();
        try {
            nc = this.getNatsConnection();
            if (nc == null) {
                logger.debug("stan: NATS connection already closed");
                return;
            }
            nc = this.getNatsConnection();
            try {
                CloseResponse cr;
                this.setNatsConnection(null);
                if (this.getAckSubscription() != null) {
                    try {
                        this.getAckSubscription().unsubscribe();
                    }
                    catch (Exception e) {
                        logger.warn("stan: error unsubscribing from acks during connection close");
                        logger.debug("Full stack trace: ", (Throwable)e);
                    }
                }
                if (this.getHbSubscription() != null) {
                    try {
                        this.getHbSubscription().unsubscribe();
                    }
                    catch (Exception e) {
                        logger.warn("stan: error unsubscribing from heartbeats during connection close");
                        logger.debug("Full stack trace: ", (Throwable)e);
                    }
                }
                CloseRequest req = CloseRequest.newBuilder().setClientID(this.clientId).build();
                logger.trace("CLOSE request: [{}]", (Object)req);
                byte[] bytes = req.toByteArray();
                io.nats.client.Message reply = null;
                try {
                    reply = nc.request(this.closeRequests, bytes, this.opts.getConnectTimeout().toMillis());
                }
                catch (Exception e) {
                    if ("nats: timeout".equals(e.getMessage())) {
                        throw new TimeoutException(ERR_CLOSE_REQ_TIMEOUT);
                    }
                    throw e;
                }
                logger.trace("CLOSE response: [{}]", (Object)reply);
                if (reply.getData() != null && !(cr = CloseResponse.parseFrom(reply.getData())).getError().isEmpty()) {
                    throw new IOException(cr.getError());
                }
            }
            finally {
                if (this.ncOwned) {
                    try {
                        nc.close();
                    }
                    catch (Exception ignore) {
                        logger.debug("NATS connection was null in close()");
                    }
                }
            }
        }
        finally {
            this.unlock();
        }
    }

    protected AckClosure createAckClosure(AckHandler ah, BlockingQueue<String> ch) {
        return new AckClosure(ah, ch);
    }

    protected SubscriptionImpl createSubscription(String subject, String qgroup, MessageHandler cb, ConnectionImpl conn, SubscriptionOptions opts) {
        SubscriptionImpl sub = new SubscriptionImpl(subject, qgroup, cb, conn, opts);
        return sub;
    }

    protected void processHeartBeat(io.nats.client.Message msg) {
        this.rLock();
        io.nats.client.Connection nc = this.nc;
        this.rUnlock();
        if (nc != null) {
            try {
                nc.publish(msg.getReplyTo(), null);
                logger.debug("Sent heartbeat response");
            }
            catch (IOException e) {
                logger.warn("stan: error publishing heartbeat response: {}", (Object)e.getMessage());
                logger.debug("Full stack trace:", (Throwable)e);
            }
        }
    }

    BlockingQueue<String> createErrorChannel() {
        return new LinkedBlockingQueue<String>();
    }

    @Override
    public void publish(String subject, byte[] data) throws IOException {
        BlockingQueue<String> ch = this.createErrorChannel();
        this.publish(subject, data, null, ch);
        String err = null;
        try {
            err = ch.take();
            if (!err.isEmpty()) {
                throw new IOException(err);
            }
        }
        catch (InterruptedException e) {
            logger.debug("stan: publish interrupted");
            logger.debug("Full stack trace:", (Throwable)e);
        }
    }

    @Override
    public String publish(String subject, byte[] data, AckHandler ah) throws IOException {
        return this.publish(subject, data, ah, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    String publish(String subject, byte[] data, AckHandler ah, BlockingQueue<String> ch) throws IOException {
        BlockingQueue<PubAck> pac;
        Duration ackTimeout;
        byte[] bytes;
        String guid;
        String subj = null;
        String ackSubject = null;
        AckClosure a = this.createAckClosure(ah, ch);
        this.lock();
        try {
            if (this.getNatsConnection() == null) {
                throw new IllegalStateException(ERR_CONNECTION_CLOSED);
            }
            subj = this.pubPrefix + "." + subject;
            guid = NUID.nextGlobal();
            PubMsg.Builder pb = PubMsg.newBuilder().setClientID(this.clientId).setGuid(guid).setSubject(subject);
            if (data != null) {
                pb = pb.setData(ByteString.copyFrom((byte[])data));
            }
            PubMsg pe = pb.build();
            bytes = pe.toByteArray();
            this.pubAckMap.put(guid, a);
            ackSubject = this.ackSubject;
            ackTimeout = this.opts.getAckTimeout();
            pac = this.pubAckChan;
        }
        finally {
            this.unlock();
        }
        try {
            pac.put(PubAck.getDefaultInstance());
        }
        catch (InterruptedException e) {
            logger.warn("Publish operation interrupted", (Throwable)e);
        }
        try {
            this.nc.publish(subj, ackSubject, bytes, true);
        }
        catch (IOException e) {
            this.removeAck(guid);
            throw e;
        }
        this.lock();
        try {
            a.ackTask = this.createAckTimerTask(guid);
            this.ackTimer.schedule(a.ackTask, ackTimeout.toMillis());
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            this.unlock();
        }
        return guid;
    }

    @Override
    public Subscription subscribe(String subject, MessageHandler cb) throws IOException, TimeoutException {
        return this.subscribe(subject, cb, null);
    }

    @Override
    public Subscription subscribe(String subject, MessageHandler cb, SubscriptionOptions opts) throws IOException, TimeoutException {
        return this.subscribe(subject, null, cb, opts);
    }

    @Override
    public Subscription subscribe(String subject, String queue, MessageHandler cb) throws IOException, TimeoutException {
        return this.subscribe(subject, queue, cb, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Subscription subscribe(String subject, String queue, MessageHandler cb, SubscriptionOptions opts) throws IOException, TimeoutException {
        SubscriptionImpl sub = null;
        io.nats.client.Connection nc = null;
        this.lock();
        try {
            if (this.getNatsConnection() == null) {
                sub = null;
                throw new IllegalStateException(ERR_CONNECTION_CLOSED);
            }
            sub = this.createSubscription(subject, queue, cb, this, opts);
            this.subMap.put(sub.inbox, sub);
            nc = this.getNatsConnection();
        }
        finally {
            this.unlock();
        }
        sub.wLock();
        try {
            AsyncSubscription nsub = nc.subscribe(sub.inbox, (io.nats.client.MessageHandler)this);
            sub.inboxSub = nsub;
            SubscriptionRequest sr = this.createSubscriptionRequest(sub);
            io.nats.client.Message reply = null;
            try {
                reply = nc.request(this.subRequests, sr.toByteArray(), 2L, TimeUnit.SECONDS);
            }
            catch (TimeoutException e) {
                throw new TimeoutException("stan: publish ack timeout");
            }
            SubscriptionResponse response = null;
            try {
                response = SubscriptionResponse.parseFrom(reply.getData());
            }
            catch (InvalidProtocolBufferException e) {
                sub.inboxSub.unsubscribe();
                sub.inboxSub = null;
                throw e;
            }
            if (!response.getError().isEmpty()) {
                sub.inboxSub.unsubscribe();
                throw new IOException(response.getError());
            }
            sub.setAckInbox(response.getAckInbox());
        }
        finally {
            sub.wUnlock();
        }
        return sub;
    }

    protected SubscriptionRequest createSubscriptionRequest(SubscriptionImpl sub) {
        SubscriptionOptions subOpts = sub.getOptions();
        SubscriptionRequest.Builder srb = SubscriptionRequest.newBuilder();
        String clientId = sub.getConnection().getClientId();
        String queue = sub.getQueue();
        String subject = sub.getSubject();
        srb.setClientID(clientId).setSubject(subject).setQGroup(queue == null ? "" : queue).setInbox(sub.getInbox()).setMaxInFlight(subOpts.getMaxInFlight()).setAckWaitInSecs((int)subOpts.getAckWait().getSeconds());
        switch (subOpts.getStartAt()) {
            case First: {
                break;
            }
            case LastReceived: {
                break;
            }
            case NewOnly: {
                break;
            }
            case SequenceStart: {
                srb.setStartSequence(subOpts.getStartSequence());
                break;
            }
            case TimeDeltaStart: {
                long delta = ChronoUnit.NANOS.between(subOpts.getStartTime(), Instant.now());
                srb.setStartTimeDelta(delta);
                break;
            }
        }
        srb.setStartPosition(subOpts.getStartAt());
        if (subOpts.getDurableName() != null) {
            srb.setDurableName(subOpts.getDurableName());
        }
        SubscriptionRequest sr = srb.build();
        return sr;
    }

    protected void processAck(io.nats.client.Message msg) {
        PubAck pa = null;
        IOException ex = null;
        try {
            pa = PubAck.parseFrom(msg.getData());
        }
        catch (InvalidProtocolBufferException e) {
            logger.error("stan: error unmarshaling PubAck");
            logger.debug("Full stack trace: ", (Throwable)e);
            return;
        }
        AckClosure ackClosure = this.removeAck(pa.getGuid());
        if (ackClosure != null) {
            String ackError = pa.getError();
            if (ackClosure.ah != null) {
                if (!ackError.isEmpty()) {
                    ex = new IOException(ackError);
                }
                ackClosure.ah.onAck(pa.getGuid(), ex);
            } else if (ackClosure.ch != null) {
                try {
                    ackClosure.ch.put(ackError);
                }
                catch (InterruptedException e) {
                    logger.debug("stan: processAck interrupted");
                }
            }
        }
    }

    TimerTask createAckTimerTask(final String guid) {
        TimerTask task = new TimerTask(){

            @Override
            public void run() {
                ConnectionImpl.this.processAckTimeout(guid);
            }
        };
        return task;
    }

    protected void processAckTimeout(String guid) {
        AckClosure ackClosure = this.removeAck(guid);
        if (ackClosure.ah != null) {
            ackClosure.ah.onAck(guid, new TimeoutException("stan: publish ack timeout"));
        } else if (ackClosure.ch != null && !ackClosure.ch.offer("stan: publish ack timeout")) {
            logger.warn("stan: processAckTimeout unable to write timeout error to ack channel");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected AckClosure removeAck(String guid) {
        AckClosure ackClosure = null;
        BlockingQueue<PubAck> pac = null;
        TimerTask timerTask = null;
        this.lock();
        try {
            ackClosure = this.pubAckMap.get(guid);
            if (ackClosure != null) {
                timerTask = ackClosure.ackTask;
                this.pubAckMap.remove(guid);
            }
            pac = this.pubAckChan;
        }
        finally {
            this.unlock();
        }
        if (timerTask != null) {
            timerTask.cancel();
        }
        if (ackClosure != null && pac.size() > 0) {
            try {
                pac.take();
            }
            catch (InterruptedException e) {
                logger.warn("stan: interrupted during removeAck for {}", (Object)guid);
                logger.debug("Full stack trace:", (Throwable)e);
            }
        }
        return ackClosure;
    }

    public void onMessage(io.nats.client.Message msg) {
        this.processMsg(msg);
    }

    protected Message createStanMessage(MsgProto msgp) {
        return new Message(msgp);
    }

    protected void processMsg(io.nats.client.Message raw) {
        Message stanMsg = null;
        boolean isClosed = false;
        SubscriptionImpl sub = null;
        io.nats.client.Connection nc = null;
        try {
            MsgProto msgp = MsgProto.parseFrom(raw.getData());
            stanMsg = this.createStanMessage(msgp);
        }
        catch (InvalidProtocolBufferException e) {
            logger.error("stan: error unmarshaling msg");
            logger.debug("msg: {}", (Object)raw);
            logger.debug("full stack trace:", (Throwable)e);
        }
        this.lock();
        try {
            nc = this.getNatsConnection();
            isClosed = nc == null;
            sub = (SubscriptionImpl)this.subMap.get(raw.getSubject());
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            this.unlock();
        }
        if (sub == null || isClosed) {
            return;
        }
        stanMsg.setSubscription(sub);
        MessageHandler cb = null;
        String ackSubject = null;
        boolean isManualAck = false;
        ConnectionImpl subsc = null;
        sub.rLock();
        try {
            cb = sub.getMessageHandler();
            ackSubject = sub.getAckInbox();
            isManualAck = sub.getOptions().isManualAcks();
            subsc = sub.getConnection();
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            sub.rUnlock();
        }
        if (cb != null && subsc != null) {
            cb.onMessage(stanMsg);
        }
        if (!isManualAck) {
            Ack ack = Ack.newBuilder().setSubject(stanMsg.getSubject()).setSequence(stanMsg.getSequence()).build();
            try {
                nc.publish(ackSubject, ack.toByteArray());
            }
            catch (IOException e) {
                logger.error("Exception while publishing auto-ack: {}", (Object)e.getMessage());
                logger.debug("Stack trace: ", (Throwable)e);
            }
        }
    }

    public String getClientId() {
        return this.clientId;
    }

    @Override
    public io.nats.client.Connection getNatsConnection() {
        return this.nc;
    }

    protected void setNatsConnection(io.nats.client.Connection nc) {
        this.nc = nc;
    }

    public String newInbox() {
        return this.nc.newInbox();
    }

    protected void lock() {
        this.mu.writeLock().lock();
    }

    protected void unlock() {
        this.mu.writeLock().unlock();
    }

    protected void rLock() {
        this.mu.writeLock().lock();
    }

    protected void rUnlock() {
        this.mu.writeLock().unlock();
    }

    protected io.nats.client.Subscription getAckSubscription() {
        return this.ackSubscription;
    }

    protected io.nats.client.Subscription getHbSubscription() {
        return this.hbSubscription;
    }

    void setPubAckChan(BlockingQueue<PubAck> ch) {
        this.pubAckChan = ch;
    }

    BlockingQueue<PubAck> getPubAckChan() {
        return this.pubAckChan;
    }

    void setPubAckMap(Map<String, AckClosure> map) {
        this.pubAckMap = map;
    }

    Map<String, AckClosure> getPubAckMap() {
        return this.pubAckMap;
    }

    void setSubMap(Map<String, Subscription> map) {
        this.subMap = map;
    }

    Map<String, Subscription> getSubMap() {
        return this.subMap;
    }

    class AckClosure {
        protected TimerTask ackTask;
        AckHandler ah;
        BlockingQueue<String> ch;

        AckClosure(AckHandler ah, BlockingQueue<String> ch) {
            this.ah = ah;
            this.ch = ch;
        }
    }
}

