/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.messaging.mqtt;

import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.joynr.exceptions.JoynrSerializationException;
import io.joynr.messaging.FailureAction;
import io.joynr.messaging.IMessagingMulticastSubscriber;
import io.joynr.messaging.IMessagingSkeleton;
import io.joynr.messaging.IRawMessaging;
import io.joynr.messaging.JoynrMessageProcessor;
import io.joynr.messaging.JoynrMessageSerializer;
import io.joynr.messaging.RawMessagingPreprocessor;
import io.joynr.messaging.mqtt.JoynrMqttClient;
import io.joynr.messaging.mqtt.MqttClientFactory;
import io.joynr.messaging.mqtt.MqttMessageSerializerFactory;
import io.joynr.messaging.mqtt.MqttTopicPrefixProvider;
import io.joynr.messaging.routing.MessageRouter;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import joynr.JoynrMessage;
import joynr.system.RoutingTypes.Address;
import joynr.system.RoutingTypes.MqttAddress;
import joynr.system.RoutingTypes.RoutingTypesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttMessagingSkeleton
implements IMessagingSkeleton,
IMessagingMulticastSubscriber {
    private static final Logger LOG = LoggerFactory.getLogger(MqttMessagingSkeleton.class);
    private MessageRouter messageRouter;
    private JoynrMqttClient mqttClient;
    private JoynrMessageSerializer messageSerializer;
    private MqttClientFactory mqttClientFactory;
    private MqttAddress ownAddress;
    private ConcurrentMap<String, AtomicInteger> multicastSubscriptionCount = Maps.newConcurrentMap();
    private MqttTopicPrefixProvider mqttTopicPrefixProvider;
    private RawMessagingPreprocessor rawMessagingPreprocessor;
    private Set<JoynrMessageProcessor> messageProcessors;

    @Inject
    public MqttMessagingSkeleton(@Named(value="property_mqtt_global_address") MqttAddress ownAddress, MessageRouter messageRouter, MqttClientFactory mqttClientFactory, MqttMessageSerializerFactory messageSerializerFactory, MqttTopicPrefixProvider mqttTopicPrefixProvider, RawMessagingPreprocessor rawMessagingPreprocessor, Set<JoynrMessageProcessor> messageProcessors) {
        this.ownAddress = ownAddress;
        this.messageRouter = messageRouter;
        this.mqttClientFactory = mqttClientFactory;
        this.mqttTopicPrefixProvider = mqttTopicPrefixProvider;
        this.rawMessagingPreprocessor = rawMessagingPreprocessor;
        this.messageProcessors = messageProcessors;
        this.messageSerializer = messageSerializerFactory.create((Address)ownAddress);
    }

    public void init() {
        this.mqttClient = this.mqttClientFactory.create();
        this.mqttClient.setMessageListener((IRawMessaging)this);
        this.mqttClient.start();
        this.subscribe();
    }

    protected void subscribe() {
        this.mqttClient.subscribe(this.ownAddress.getTopic() + "/#");
    }

    public void shutdown() {
        this.mqttClient.shutdown();
    }

    public void registerMulticastSubscription(String multicastId) {
        this.multicastSubscriptionCount.putIfAbsent(multicastId, new AtomicInteger());
        int numberOfSubscriptions = ((AtomicInteger)this.multicastSubscriptionCount.get(multicastId)).incrementAndGet();
        if (numberOfSubscriptions == 1) {
            this.mqttClient.subscribe(this.getSubscriptionTopic(multicastId));
        }
    }

    public void unregisterMulticastSubscription(String multicastId) {
        int remainingCount;
        AtomicInteger subscribersCount = (AtomicInteger)this.multicastSubscriptionCount.get(multicastId);
        if (subscribersCount != null && (remainingCount = subscribersCount.decrementAndGet()) == 0) {
            this.mqttClient.unsubscribe(this.getSubscriptionTopic(multicastId));
        }
    }

    private String translateWildcard(String multicastId) {
        String topic = multicastId;
        if (topic.endsWith("/*")) {
            topic = topic.replaceFirst("/\\*$", "/#");
        }
        return topic;
    }

    public void transmit(JoynrMessage message, FailureAction failureAction) {
        LOG.debug("<<< INCOMING <<< {}", (Object)message.toLogMessage());
        try {
            String replyToMqttAddress;
            if ("multicast".equals(message.getType())) {
                message.setReceivedFromGlobal(true);
            }
            if ((replyToMqttAddress = message.getHeaderValue("replyChannelId")) != null && !replyToMqttAddress.isEmpty()) {
                this.messageRouter.addNextHop(message.getFrom(), RoutingTypesUtil.fromAddressString((String)replyToMqttAddress));
            }
            this.messageRouter.route(message);
        }
        catch (Exception e) {
            LOG.error("Error processing incoming message. Message will be dropped: {} ", (Object)message.getHeader(), (Object)e);
            failureAction.execute((Throwable)e);
        }
    }

    public void transmit(String serializedMessage, FailureAction failureAction) {
        try {
            HashMap context = new HashMap();
            JoynrMessage message = this.messageSerializer.deserialize(this.rawMessagingPreprocessor.process(serializedMessage, context));
            message.setContext(context);
            if (this.messageProcessors != null) {
                for (JoynrMessageProcessor processor : this.messageProcessors) {
                    message = processor.processIncoming(message);
                }
            }
            this.transmit(message, failureAction);
        }
        catch (JoynrSerializationException e) {
            LOG.error("Message: \"{}\", could not be serialized, exception: {}", (Object)serializedMessage, (Object)e.getMessage());
        }
    }

    protected JoynrMqttClient getClient() {
        return this.mqttClient;
    }

    protected MqttAddress getOwnAddress() {
        return this.ownAddress;
    }

    private String getSubscriptionTopic(String multicastId) {
        return this.mqttTopicPrefixProvider.getMulticastTopicPrefix() + this.translateWildcard(multicastId);
    }
}

