/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.messaging.mqtt;

import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.joynr.exceptions.JoynrMessageExpiredException;
import io.joynr.messaging.FailureAction;
import io.joynr.messaging.JoynrMessageProcessor;
import io.joynr.messaging.RawMessagingPreprocessor;
import io.joynr.messaging.mqtt.IMqttMessagingSkeleton;
import io.joynr.messaging.mqtt.JoynrMqttClient;
import io.joynr.messaging.mqtt.MqttClientFactory;
import io.joynr.messaging.mqtt.MqttMessageInProgressObserver;
import io.joynr.messaging.mqtt.MqttTopicPrefixProvider;
import io.joynr.messaging.routing.AbstractGlobalMessagingSkeleton;
import io.joynr.messaging.routing.MessageProcessedHandler;
import io.joynr.messaging.routing.MessageProcessedListener;
import io.joynr.messaging.routing.MessageRouter;
import io.joynr.messaging.routing.MessageRouterUtil;
import io.joynr.messaging.routing.RoutingTable;
import io.joynr.smrf.EncodingException;
import io.joynr.smrf.UnsuppportedVersionException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import joynr.ImmutableMessage;
import joynr.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttMessagingSkeleton
extends AbstractGlobalMessagingSkeleton
implements IMqttMessagingSkeleton,
MessageProcessedListener {
    private static final Logger logger = LoggerFactory.getLogger(MqttMessagingSkeleton.class);
    protected final String ownTopic;
    protected JoynrMqttClient client;
    protected final String ownGbid;
    private final MessageRouter messageRouter;
    private final MessageProcessedHandler messageProcessedHandler;
    protected final MqttClientFactory mqttClientFactory;
    private final ConcurrentMap<String, AtomicInteger> multicastSubscriptionCount;
    private final MqttTopicPrefixProvider mqttTopicPrefixProvider;
    private final RawMessagingPreprocessor rawMessagingPreprocessor;
    private final Set<JoynrMessageProcessor> messageProcessors;
    private final Set<String> incomingMqttRequests;
    private final String backendUid;
    private final List<Mqtt5Publish> publishesToAcknowledge;
    private final MqttMessageInProgressObserver mqttMessageInProgressObserver;

    public MqttMessagingSkeleton(String ownTopic, MessageRouter messageRouter, MessageProcessedHandler messageProcessedHandler, MqttClientFactory mqttClientFactory, MqttTopicPrefixProvider mqttTopicPrefixProvider, RawMessagingPreprocessor rawMessagingPreprocessor, Set<JoynrMessageProcessor> messageProcessors, String ownGbid, RoutingTable routingTable, String backendUid, MqttMessageInProgressObserver mqttMessageInProgressObserver) {
        super(routingTable);
        this.ownTopic = ownTopic;
        this.messageRouter = messageRouter;
        this.messageProcessedHandler = messageProcessedHandler;
        this.mqttClientFactory = mqttClientFactory;
        this.mqttTopicPrefixProvider = mqttTopicPrefixProvider;
        this.rawMessagingPreprocessor = rawMessagingPreprocessor;
        this.messageProcessors = messageProcessors != null ? new HashSet<JoynrMessageProcessor>(messageProcessors) : null;
        this.incomingMqttRequests = Collections.synchronizedSet(new HashSet());
        this.multicastSubscriptionCount = new ConcurrentHashMap<String, AtomicInteger>();
        this.ownGbid = ownGbid;
        this.backendUid = backendUid;
        this.publishesToAcknowledge = new ArrayList<Mqtt5Publish>();
        this.mqttMessageInProgressObserver = mqttMessageInProgressObserver;
        mqttMessageInProgressObserver.registerMessagingSkeleton(this);
        this.client = mqttClientFactory.createReceiver(ownGbid);
    }

    public void init() {
        logger.debug("Initializing MQTT skeleton (ownGbid={}) ...", (Object)this.ownGbid);
        this.messageProcessedHandler.registerMessageProcessedListener((MessageProcessedListener)this);
        this.client.setMessageListener(this);
        this.client.start();
        this.mqttClientFactory.connect(this.client);
        JoynrMqttClient sender = this.mqttClientFactory.createSender(this.ownGbid);
        sender.start();
        this.mqttClientFactory.connect(sender);
        this.subscribe();
    }

    protected void subscribe() {
        this.client.subscribe(this.ownTopic + "/#");
    }

    public void shutdown() {
    }

    public void registerMulticastSubscription(String multicastId) {
        this.multicastSubscriptionCount.putIfAbsent(multicastId, new AtomicInteger());
        int numberOfSubscriptions = ((AtomicInteger)this.multicastSubscriptionCount.get(multicastId)).incrementAndGet();
        if (numberOfSubscriptions == 1) {
            this.client.subscribe(this.getSubscriptionTopic(multicastId));
        }
    }

    public void unregisterMulticastSubscription(String multicastId) {
        int remainingCount;
        AtomicInteger subscribersCount = (AtomicInteger)this.multicastSubscriptionCount.get(multicastId);
        if (subscribersCount != null && (remainingCount = subscribersCount.decrementAndGet()) == 0) {
            this.client.unsubscribe(this.getSubscriptionTopic(multicastId));
        }
    }

    private String translateWildcard(String multicastId) {
        String topic = multicastId;
        if (topic.endsWith("/*")) {
            topic = topic.replaceFirst("/\\*$", "/#");
        }
        return topic;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void transmit(Mqtt5Publish mqtt5Publish, Map<String, String> prefixedCustomHeaders, FailureAction failureAction) {
        block16: {
            try {
                HashMap context = new HashMap();
                byte[] processedMessage = this.rawMessagingPreprocessor.process(mqtt5Publish.getPayloadAsBytes(), Optional.of(context));
                ImmutableMessage message = new ImmutableMessage(processedMessage);
                if (logger.isTraceEnabled()) {
                    logger.trace("<<< INCOMING FROM {} <<< {}", (Object)this.ownGbid, (Object)message);
                } else {
                    logger.debug("<<< INCOMING FROM {} <<< {} creatorUserId: {}", new Object[]{this.ownGbid, message.getTrackingInfo(), this.backendUid});
                }
                MessageRouterUtil.checkExpiry((ImmutableMessage)message);
                message.setContext(context);
                message.setPrefixedExtraCustomHeaders(prefixedCustomHeaders);
                message.setCreatorUserId(this.backendUid);
                if (this.messageProcessors != null) {
                    for (JoynrMessageProcessor processor : this.messageProcessors) {
                        message = processor.processIncoming(message);
                    }
                }
                message.setReceivedFromGlobal(true);
                boolean ack = true;
                if (this.isRequestMessageType(message.getType())) {
                    ack = this.mqttMessageInProgressObserver.canMessageBeAcknowledged(message.getId());
                    this.requestAccepted(message.getId());
                }
                boolean routingEntryRegistered = false;
                try {
                    routingEntryRegistered = this.registerGlobalRoutingEntry(message, this.ownGbid);
                    this.messageRouter.routeIn(message);
                    if (ack) {
                        mqtt5Publish.acknowledge();
                        break block16;
                    }
                    List<Mqtt5Publish> list = this.publishesToAcknowledge;
                    synchronized (list) {
                        this.publishesToAcknowledge.add(mqtt5Publish);
                    }
                }
                catch (Exception e) {
                    this.removeGlobalRoutingEntry(message, routingEntryRegistered);
                    this.messageProcessed(message.getId());
                    mqtt5Publish.acknowledge();
                    failureAction.execute((Throwable)e);
                }
            }
            catch (EncodingException | UnsuppportedVersionException | NullPointerException e) {
                logger.error("Message: \"{}\" could not be deserialized, exception: {}", (Object)mqtt5Publish.getPayloadAsBytes(), (Object)e.getMessage());
                mqtt5Publish.acknowledge();
                failureAction.execute(e);
            }
            catch (Exception e) {
                String message = String.format("Message \"%s\" could not be transmitted: %s", Arrays.toString(mqtt5Publish.getPayloadAsBytes()), e);
                if (e instanceof JoynrMessageExpiredException) {
                    logger.warn(message);
                } else {
                    logger.error(message);
                }
                mqtt5Publish.acknowledge();
                failureAction.execute((Throwable)e);
            }
        }
    }

    private boolean isRequestMessageType(Message.MessageType messageType) {
        return messageType.equals((Object)Message.MessageType.VALUE_MESSAGE_TYPE_REQUEST) || messageType.equals((Object)Message.MessageType.VALUE_MESSAGE_TYPE_ONE_WAY);
    }

    private String getSubscriptionTopic(String multicastId) {
        return this.mqttTopicPrefixProvider.getMulticastTopicPrefix() + this.translateWildcard(multicastId);
    }

    protected int getCurrentCountOfUnprocessedMqttRequests() {
        return this.incomingMqttRequests.size();
    }

    public void messageProcessed(String messageId) {
        if (this.incomingMqttRequests.remove(messageId)) {
            this.mqttMessageInProgressObserver.decrementMessagesInProgress(messageId);
            this.requestProcessed(messageId);
        }
    }

    private void requestAccepted(String messageId) {
        this.incomingMqttRequests.add(messageId);
    }

    private void requestProcessed(String messageId) {
        logger.debug("Request with messageId {} was processed and is removed from the MQTT skeleton tracking list", (Object)messageId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void acknowledgeOutstandingPublishes() {
        List<Mqtt5Publish> list = this.publishesToAcknowledge;
        synchronized (list) {
            for (Mqtt5Publish publish : this.publishesToAcknowledge) {
                publish.acknowledge();
            }
            this.publishesToAcknowledge.clear();
        }
    }
}

