/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.mqtt3.internal.routing;

import com.mulesoft.connectors.mqtt3.api.Topic;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3Message;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3MessageHandler;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3TopicMatcher;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.mule.runtime.api.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTT3TopicRouter {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQTT3TopicRouter.class);
    private final MQTT3TopicMatcher topicMatcher;
    private ConcurrentHashMap<String, Pair<Topic, List<MQTT3MessageHandler>>> topicCallbacksRegistry;

    public MQTT3TopicRouter(MQTT3TopicMatcher topicMatcher) {
        this.topicMatcher = topicMatcher;
        this.topicCallbacksRegistry = new ConcurrentHashMap();
    }

    public synchronized List<Topic> registerCallbackForTopics(List<Topic> topics, MQTT3MessageHandler messageHandler) {
        ArrayList<Topic> newTopics = new ArrayList<Topic>();
        for (Topic topic : topics) {
            if (this.topicCallbacksRegistry.containsKey(topic.getTopicFilter())) {
                LOGGER.debug("Topic {} already subscribed to with qos {}", (Object)topic.getTopicFilter(), (Object)topic.getQos().getValue());
                ((List)this.topicCallbacksRegistry.get(topic.getTopicFilter()).getSecond()).add(messageHandler);
                continue;
            }
            newTopics.add(topic);
            ArrayList<MQTT3MessageHandler> callbackList = new ArrayList<MQTT3MessageHandler>();
            callbackList.add(messageHandler);
            this.topicCallbacksRegistry.put(topic.getTopicFilter(), (Pair<Topic, List<MQTT3MessageHandler>>)new Pair((Object)topic, callbackList));
        }
        return newTopics;
    }

    public synchronized List<Topic> deregisterCallbackForTopics(List<Topic> topics, MQTT3MessageHandler messageHandler) {
        ArrayList<Topic> deletedTopics = new ArrayList<Topic>();
        for (Topic topic : topics) {
            if (!this.topicCallbacksRegistry.containsKey(topic.getTopicFilter())) continue;
            List callbacksList = (List)this.topicCallbacksRegistry.get(topic.getTopicFilter()).getSecond();
            callbacksList.remove(messageHandler);
            if (!callbacksList.isEmpty()) continue;
            this.topicCallbacksRegistry.remove(topic.getTopicFilter());
            deletedTopics.add(topic);
        }
        return deletedTopics;
    }

    public List<Topic> getDistinctTopicFilters() {
        return this.topicCallbacksRegistry.values().stream().map(Pair::getFirst).collect(Collectors.toList());
    }

    public void handleMessageArrived(MQTT3Message mqttMessage) {
        Predicate<Map.Entry> topicMatchesFilter = entry -> this.topicMatcher.topicMatches((String)entry.getKey(), mqttMessage.getTopic());
        this.topicCallbacksRegistry.entrySet().stream().filter(topicMatchesFilter).forEach(pair -> ((List)((Pair)pair.getValue()).getSecond()).stream().forEach(callback -> callback.onMessageArrived(mqttMessage)));
    }
}

