/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;

public class MQTTSubscriptionManager {
    private MQTTSession session;
    private ConcurrentMap<Long, Integer> consumerQoSLevels;
    private ConcurrentMap<String, ServerConsumer> consumers;
    private SimpleString managementFilter;

    public MQTTSubscriptionManager(MQTTSession session) {
        this.session = session;
        this.consumers = new ConcurrentHashMap<String, ServerConsumer>();
        this.consumerQoSLevels = new ConcurrentHashMap<Long, Integer>();
        StringBuilder builder = new StringBuilder();
        builder.append("NOT ((");
        builder.append((CharSequence)FilterConstants.ACTIVEMQ_ADDRESS);
        builder.append(" = '");
        builder.append((CharSequence)session.getServer().getConfiguration().getManagementAddress());
        builder.append("') OR (");
        builder.append((CharSequence)FilterConstants.ACTIVEMQ_ADDRESS);
        builder.append(" = '");
        builder.append((CharSequence)session.getServer().getConfiguration().getManagementNotificationAddress());
        builder.append("'))");
        this.managementFilter = new SimpleString(builder.toString());
    }

    synchronized void start() throws Exception {
        for (MqttTopicSubscription subscription : this.session.getSessionState().getSubscriptions()) {
            Queue q = this.createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
            this.createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
        }
    }

    synchronized void stop(boolean clean) throws Exception {
        for (ServerConsumer consumer : this.consumers.values()) {
            consumer.setStarted(false);
            consumer.disconnect();
            consumer.getQueue().removeConsumer((Consumer)consumer);
            consumer.close(false);
        }
        if (clean) {
            for (ServerConsumer consumer : this.consumers.values()) {
                this.session.getServer().destroyQueue(consumer.getQueue().getName());
            }
        }
    }

    private Queue createQueueForSubscription(String topic, int qos) throws Exception {
        String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
        SimpleString queue = this.getQueueNameForTopic(address);
        Queue q = this.session.getServer().locateQueue(queue);
        if (q == null) {
            q = this.session.getServerSession().createQueue(new SimpleString(address), queue, this.managementFilter, false, qos >= 0);
        }
        return q;
    }

    private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception {
        long cid = this.session.getServer().getStorageManager().generateID();
        ServerConsumer consumer = this.session.getServerSession().createConsumer(cid, queue.getName(), null, false, true, Integer.valueOf(-1));
        consumer.setStarted(true);
        this.consumers.put(topic, consumer);
        this.consumerQoSLevels.put(cid, qos);
    }

    private void addSubscription(MqttTopicSubscription subscription) throws Exception {
        MqttTopicSubscription s = this.session.getSessionState().getSubscription(subscription.topicName());
        int qos = subscription.qualityOfService().value();
        String topic = subscription.topicName();
        this.session.getSessionState().addSubscription(subscription);
        Queue q = this.createQueueForSubscription(topic, qos);
        if (s == null) {
            this.createConsumerForSubscriptionQueue(q, topic, qos);
        } else {
            this.consumerQoSLevels.put(((ServerConsumer)this.consumers.get(topic)).getID(), qos);
        }
        this.session.getRetainMessageManager().addRetainedMessagesToQueue(q, topic);
    }

    void removeSubscriptions(List<String> topics) throws Exception {
        for (String topic : topics) {
            this.removeSubscription(topic);
        }
    }

    private synchronized void removeSubscription(String address) throws Exception {
        ServerConsumer consumer = (ServerConsumer)this.consumers.get(address);
        String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address);
        SimpleString internalQueueName = this.getQueueNameForTopic(internalAddress);
        Queue queue = this.session.getServer().locateQueue(internalQueueName);
        queue.deleteQueue(true);
        this.session.getSessionState().removeSubscription(address);
        this.consumers.remove(address);
        this.consumerQoSLevels.remove(consumer.getID());
    }

    private SimpleString getQueueNameForTopic(String topic) {
        return new SimpleString(this.session.getSessionState().getClientId() + "." + topic);
    }

    int[] addSubscriptions(List<MqttTopicSubscription> subscriptions) throws Exception {
        int[] qos = new int[subscriptions.size()];
        for (int i = 0; i < subscriptions.size(); ++i) {
            this.addSubscription(subscriptions.get(i));
            qos[i] = subscriptions.get(i).qualityOfService().value();
        }
        return qos;
    }

    Map<Long, Integer> getConsumerQoSLevels() {
        return this.consumerQoSLevels;
    }
}

