/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Dispatcher;
import io.nats.client.MessageHandler;
import io.nats.client.impl.MessageQueue;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsConsumer;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.NatsSubscription;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

class NatsDispatcher
extends NatsConsumer
implements Dispatcher,
Runnable {
    private NatsConnection connection;
    private MessageQueue incoming;
    private MessageHandler handler;
    private Thread thread;
    private final AtomicBoolean running;
    private String id;
    private Map<String, NatsSubscription> subscriptions;

    NatsDispatcher(NatsConnection conn, MessageHandler handler) {
        this.connection = conn;
        this.handler = handler;
        this.incoming = new MessageQueue(true);
        this.subscriptions = new ConcurrentHashMap<String, NatsSubscription>();
        this.running = new AtomicBoolean(false);
    }

    void start(String id) {
        this.id = id;
        this.running.set(true);
        String name = this.connection.getOptions().getConnectionName() != null ? this.connection.getOptions().getConnectionName() : "Nats Connection";
        this.thread = new Thread((Runnable)this, name + " Dispatcher");
        this.thread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Duration waitForMessage = Duration.ofMinutes(5L);
        try {
            while (this.running.get()) {
                NatsSubscription sub;
                NatsMessage msg = this.incoming.pop(waitForMessage);
                if (msg == null || (sub = msg.getNatsSubscription()) == null || !sub.isActive()) continue;
                sub.incrementDeliveredCount();
                this.incrementDeliveredCount();
                try {
                    this.handler.onMessage(msg);
                }
                catch (Exception exp) {
                    this.connection.processException(exp);
                }
                if (!sub.reachedUnsubLimit()) continue;
                this.connection.invalidate(sub);
            }
        }
        catch (InterruptedException exp) {
            if (this.running.get()) {
                this.connection.processException(exp);
            }
        }
        finally {
            this.running.set(false);
            this.thread = null;
        }
    }

    void stop(boolean unsubscribeAll) {
        this.running.set(false);
        this.incoming.pause();
        if (this.thread != null) {
            try {
                this.thread.interrupt();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (unsubscribeAll) {
            this.subscriptions.forEach((subj, sub) -> this.connection.unsubscribe((NatsSubscription)sub, -1));
        } else {
            this.subscriptions.clear();
        }
    }

    @Override
    public boolean isActive() {
        return this.running.get();
    }

    String getId() {
        return this.id;
    }

    @Override
    MessageQueue getMessageQueue() {
        return this.incoming;
    }

    void resendSubscriptions() {
        this.subscriptions.forEach((id, sub) -> this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName()));
    }

    void remove(NatsSubscription sub) {
        this.subscriptions.remove(sub.getSubject());
    }

    @Override
    public Dispatcher subscribe(String subject) {
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in subscribe");
        }
        return this.subscribeImpl(subject, null);
    }

    @Override
    public Dispatcher subscribe(String subject, String queueName) {
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in subscribe");
        }
        if (queueName == null || queueName.length() == 0) {
            throw new IllegalArgumentException("QueueName is required in subscribe");
        }
        return this.subscribeImpl(subject, queueName);
    }

    Dispatcher subscribeImpl(String subject, String queueName) {
        NatsSubscription actual;
        if (!this.running.get()) {
            throw new IllegalStateException("Dispatcher is closed");
        }
        NatsSubscription sub = this.subscriptions.get(subject);
        if (sub == null && (actual = this.subscriptions.putIfAbsent(subject, sub = this.connection.createSubscription(subject, queueName, this))) != null) {
            this.connection.unsubscribe(sub, -1);
        }
        return this;
    }

    @Override
    public Dispatcher unsubscribe(String subject) {
        return this.unsubscribe(subject, -1);
    }

    @Override
    public Dispatcher unsubscribe(String subject, int after) {
        if (!this.running.get()) {
            throw new IllegalStateException("Dispatcher is closed");
        }
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in unsubscribe");
        }
        NatsSubscription sub = this.subscriptions.get(subject);
        if (sub != null) {
            this.connection.unsubscribe(sub, after);
        }
        return this;
    }
}

