/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client;

import io.nats.client.AsyncSubscription;
import io.nats.client.Connection;
import io.nats.client.ConnectionImpl;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.NATSThreadFactory;
import io.nats.client.SubscriptionImpl;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class AsyncSubscriptionImpl
extends SubscriptionImpl
implements AsyncSubscription {
    private ExecutorService executor = null;
    private MessageHandler msgHandler;

    protected AsyncSubscriptionImpl(ConnectionImpl nc, String subj, String queue, MessageHandler cb, int max) {
        super(nc, subj, queue, max);
        this.msgHandler = cb;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected boolean processMsg(Message m) {
        long localMax;
        MessageHandler localHandler;
        Connection localConn;
        this.mu.lock();
        try {
            localConn = this.getConnection();
            localHandler = this.msgHandler;
            localMax = this.max;
        }
        finally {
            this.mu.unlock();
        }
        if (localHandler == null) {
            return true;
        }
        if (localConn == null) {
            return false;
        }
        long d = this.delivered.incrementAndGet();
        if (localMax <= 0L || d <= localMax) {
            try {
                localHandler.onMessage(m);
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (d == localMax) {
                try {
                    this.unsubscribe();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                this.conn = null;
            }
        }
        return true;
    }

    boolean isStarted() {
        return this.executor != null;
    }

    void enable() {
        Runnable msgFeeder = new Runnable(){

            @Override
            public void run() {
                AsyncSubscriptionImpl.this.logger.trace("msgFeeder has started for subj: {} sid: {}", (Object)AsyncSubscriptionImpl.this.subject, (Object)AsyncSubscriptionImpl.this.sid);
                try {
                    AsyncSubscriptionImpl.this.conn.deliverMsgs(AsyncSubscriptionImpl.this.mch);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        if (!this.isStarted()) {
            this.executor = Executors.newSingleThreadExecutor(new NATSThreadFactory("msgfeeder"));
            this.executor.execute(msgFeeder);
            this.logger.trace("Started msgFeeder for subject: " + this.getSubject() + " sid: " + this.getSid());
        }
    }

    void disable() {
        if (this.isStarted()) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }

    @Override
    public void setMessageHandler(MessageHandler cb) {
        this.msgHandler = cb;
    }

    @Override
    public void start() {
        if (this.isStarted()) {
            return;
        }
        if (!this.isValid()) {
            throw new IllegalStateException("nats: Invalid Subscription");
        }
        this.enable();
        this.conn.sendSubscriptionMessage(this);
    }

    @Override
    public void close() {
        super.close();
        this.disable();
    }
}

