/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.messaging.mqtt;

import io.joynr.messaging.FailureAction;
import io.joynr.messaging.JoynrMessageProcessor;
import io.joynr.messaging.RawMessagingPreprocessor;
import io.joynr.messaging.mqtt.IMqttMessagingSkeleton;
import io.joynr.messaging.mqtt.JoynrMqttClient;
import io.joynr.messaging.mqtt.MqttClientFactory;
import io.joynr.messaging.mqtt.MqttTopicPrefixProvider;
import io.joynr.messaging.mqtt.statusmetrics.MqttStatusReceiver;
import io.joynr.messaging.routing.MessageProcessedListener;
import io.joynr.messaging.routing.MessageRouter;
import io.joynr.messaging.routing.ReplyToAddressRegistrar;
import io.joynr.smrf.EncodingException;
import io.joynr.smrf.UnsuppportedVersionException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import joynr.ImmutableMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttMessagingSkeleton
implements IMqttMessagingSkeleton,
MessageProcessedListener {
    private static final Logger LOG = LoggerFactory.getLogger(MqttMessagingSkeleton.class);
    protected final int maxIncomingMqttRequests;
    private final MessageRouter messageRouter;
    private final ReplyToAddressRegistrar replyToAddressRegistrar;
    private JoynrMqttClient mqttClient;
    private final MqttClientFactory mqttClientFactory;
    private final String ownTopic;
    private final ConcurrentMap<String, AtomicInteger> multicastSubscriptionCount;
    private final MqttTopicPrefixProvider mqttTopicPrefixProvider;
    private final RawMessagingPreprocessor rawMessagingPreprocessor;
    private final Set<JoynrMessageProcessor> messageProcessors;
    private final Set<String> incomingMqttRequests;
    private final AtomicLong droppedMessagesCount;
    private final MqttStatusReceiver mqttStatusReceiver;
    private final String ownGbid;

    public MqttMessagingSkeleton(String ownTopic, int maxIncomingMqttRequests, MessageRouter messageRouter, ReplyToAddressRegistrar replyToAddressRegistrar, MqttClientFactory mqttClientFactory, MqttTopicPrefixProvider mqttTopicPrefixProvider, RawMessagingPreprocessor rawMessagingPreprocessor, Set<JoynrMessageProcessor> messageProcessors, MqttStatusReceiver mqttStatusReceiver, String ownGbid) {
        this.ownTopic = ownTopic;
        this.maxIncomingMqttRequests = maxIncomingMqttRequests;
        this.messageRouter = messageRouter;
        this.replyToAddressRegistrar = replyToAddressRegistrar;
        this.mqttClientFactory = mqttClientFactory;
        this.mqttTopicPrefixProvider = mqttTopicPrefixProvider;
        this.rawMessagingPreprocessor = rawMessagingPreprocessor;
        this.messageProcessors = messageProcessors;
        this.incomingMqttRequests = Collections.synchronizedSet(new HashSet());
        this.droppedMessagesCount = new AtomicLong();
        this.multicastSubscriptionCount = new ConcurrentHashMap<String, AtomicInteger>();
        this.mqttStatusReceiver = mqttStatusReceiver;
        this.ownGbid = ownGbid;
    }

    public void init() {
        LOG.debug("Initializing MQTT skeleton (ownGbid={}) ...", (Object)this.ownGbid);
        this.messageRouter.registerMessageProcessedListener((MessageProcessedListener)this);
        this.mqttClient = this.mqttClientFactory.createReceiver(this.ownGbid);
        this.mqttClient.setMessageListener(this);
        this.mqttClient.start();
        this.mqttClientFactory.createSender(this.ownGbid).start();
        this.subscribe();
    }

    protected void subscribe() {
        this.mqttClient.subscribe(this.ownTopic + "/#");
    }

    public void shutdown() {
    }

    public void registerMulticastSubscription(String multicastId) {
        this.multicastSubscriptionCount.putIfAbsent(multicastId, new AtomicInteger());
        int numberOfSubscriptions = ((AtomicInteger)this.multicastSubscriptionCount.get(multicastId)).incrementAndGet();
        if (numberOfSubscriptions == 1) {
            this.mqttClient.subscribe(this.getSubscriptionTopic(multicastId));
        }
    }

    public void unregisterMulticastSubscription(String multicastId) {
        int remainingCount;
        AtomicInteger subscribersCount = (AtomicInteger)this.multicastSubscriptionCount.get(multicastId);
        if (subscribersCount != null && (remainingCount = subscribersCount.decrementAndGet()) == 0) {
            this.mqttClient.unsubscribe(this.getSubscriptionTopic(multicastId));
        }
    }

    private String translateWildcard(String multicastId) {
        String topic = multicastId;
        if (topic.endsWith("/*")) {
            topic = topic.replaceFirst("/\\*$", "/#");
        }
        return topic;
    }

    @Override
    public void transmit(byte[] serializedMessage, FailureAction failureAction) {
        try {
            HashMap context = new HashMap();
            byte[] processedMessage = this.rawMessagingPreprocessor.process(serializedMessage, context);
            ImmutableMessage message = new ImmutableMessage(processedMessage);
            message.setContext(context);
            LOG.debug("<<< INCOMING FROM {} <<< {}", (Object)this.ownGbid, (Object)message);
            if (this.messageProcessors != null) {
                for (JoynrMessageProcessor processor : this.messageProcessors) {
                    message = processor.processIncoming(message);
                }
            }
            if (this.dropMessage(message)) {
                this.droppedMessagesCount.incrementAndGet();
                this.mqttStatusReceiver.notifyMessageDropped();
                return;
            }
            message.setReceivedFromGlobal(true);
            if (this.isRequestMessageTypeThatCanBeDropped(message.getType())) {
                this.requestAccepted(message.getId());
            }
            try {
                this.replyToAddressRegistrar.registerGlobalRoutingEntry(message, this.ownGbid);
                this.messageRouter.route(message);
            }
            catch (Exception e) {
                LOG.error("Error processing incoming message. Message will be dropped: {} ", (Object)e.getMessage());
                this.messageProcessed(message.getId());
                failureAction.execute((Throwable)e);
            }
        }
        catch (EncodingException | UnsuppportedVersionException | NullPointerException e) {
            LOG.error("Message: \"{}\", could not be deserialized, exception: {}", (Object)serializedMessage, (Object)e.getMessage());
            failureAction.execute(e);
        }
    }

    private boolean dropMessage(ImmutableMessage message) {
        if (this.maxIncomingMqttRequests > 0 && this.incomingMqttRequests.size() >= this.maxIncomingMqttRequests && this.isRequestMessageTypeThatCanBeDropped(message.getType())) {
            LOG.warn("Incoming MQTT message with id {} will be dropped as limit of {} requests is reached", (Object)message.getId(), (Object)this.maxIncomingMqttRequests);
            return true;
        }
        return false;
    }

    private boolean isRequestMessageTypeThatCanBeDropped(String messageType) {
        return messageType.equals("rq") || messageType.equals("o");
    }

    protected JoynrMqttClient getClient() {
        return this.mqttClient;
    }

    protected String getOwnTopic() {
        return this.ownTopic;
    }

    private String getSubscriptionTopic(String multicastId) {
        return this.mqttTopicPrefixProvider.getMulticastTopicPrefix() + this.translateWildcard(multicastId);
    }

    public long getDroppedMessagesCount() {
        return this.droppedMessagesCount.get();
    }

    protected int getCurrentCountOfUnprocessedMqttRequests() {
        return this.incomingMqttRequests.size();
    }

    public void messageProcessed(String messageId) {
        if (this.incomingMqttRequests.remove(messageId)) {
            this.requestProcessed(messageId);
        }
    }

    protected void requestAccepted(String messageId) {
        this.incomingMqttRequests.add(messageId);
    }

    protected void requestProcessed(String messageId) {
        LOG.debug("Request {} was processed and is removed from the MQTT skeleton tracking list", (Object)messageId);
    }
}

