/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.messaging.mqtt;

import io.joynr.messaging.JoynrMessageProcessor;
import io.joynr.messaging.RawMessagingPreprocessor;
import io.joynr.messaging.mqtt.MqttClientFactory;
import io.joynr.messaging.mqtt.MqttMessagingSkeleton;
import io.joynr.messaging.mqtt.MqttTopicPrefixProvider;
import io.joynr.messaging.routing.MessageRouter;
import io.joynr.messaging.routing.RoutingTable;
import io.joynr.statusmetrics.JoynrStatusMetricsReceiver;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SharedSubscriptionsMqttMessagingSkeleton
extends MqttMessagingSkeleton {
    private static final Logger logger = LoggerFactory.getLogger(SharedSubscriptionsMqttMessagingSkeleton.class);
    private static final String NON_ALPHA_REGEX_PATTERN = "[^a-zA-Z]";
    private final String channelId;
    private final String sharedSubscriptionsTopic;
    private final AtomicBoolean subscribedToSharedSubscriptionsTopic;
    private final String replyToTopic;
    private boolean backpressureEnabled;
    private final int backpressureIncomingMqttRequestsUpperThreshold;
    private final int backpressureIncomingMqttRequestsLowerThreshold;
    private final int unsubscribeThreshold;
    private final int resubscribeThreshold;

    public SharedSubscriptionsMqttMessagingSkeleton(String ownTopic, int maxIncomingMqttRequests, boolean backpressureEnabled, int backpressureIncomingMqttRequestsUpperThreshold, int backpressureIncomingMqttRequestsLowerThreshold, String replyToTopic, MessageRouter messageRouter, MqttClientFactory mqttClientFactory, String channelId, MqttTopicPrefixProvider mqttTopicPrefixProvider, RawMessagingPreprocessor rawMessagingPreprocessor, Set<JoynrMessageProcessor> messageProcessors, JoynrStatusMetricsReceiver joynrStatusMetricsReceiver, String ownGbid, RoutingTable routingTable) {
        super(ownTopic, maxIncomingMqttRequests, messageRouter, mqttClientFactory, mqttTopicPrefixProvider, rawMessagingPreprocessor, messageProcessors, joynrStatusMetricsReceiver, ownGbid, routingTable);
        this.replyToTopic = replyToTopic;
        this.channelId = channelId;
        this.sharedSubscriptionsTopic = this.createSharedSubscriptionsTopic();
        this.subscribedToSharedSubscriptionsTopic = new AtomicBoolean(false);
        this.backpressureEnabled = backpressureEnabled;
        this.backpressureIncomingMqttRequestsUpperThreshold = backpressureIncomingMqttRequestsUpperThreshold;
        this.backpressureIncomingMqttRequestsLowerThreshold = backpressureIncomingMqttRequestsLowerThreshold;
        this.validateBackpressureValues();
        this.unsubscribeThreshold = maxIncomingMqttRequests * backpressureIncomingMqttRequestsUpperThreshold / 100;
        this.resubscribeThreshold = maxIncomingMqttRequests * backpressureIncomingMqttRequestsLowerThreshold / 100;
    }

    private void validateBackpressureValues() {
        if (this.backpressureEnabled) {
            boolean invalidPropertyValueDetected = false;
            if (this.maxIncomingMqttRequests <= 0) {
                invalidPropertyValueDetected = true;
                logger.error("Invalid value {} for {}, expecting a limit greater than 0 when backpressure is activated", (Object)this.maxIncomingMqttRequests, (Object)"joynr.messaging.maxincomingmqttrequests");
            }
            if (this.backpressureIncomingMqttRequestsUpperThreshold <= 0 || this.backpressureIncomingMqttRequestsUpperThreshold > 100) {
                invalidPropertyValueDetected = true;
                logger.error("Invalid value {} for {}, expecting percentage value in range (0,100]", (Object)this.backpressureIncomingMqttRequestsUpperThreshold, (Object)"joynr.messaging.backpressure.incomingmqttrequests.upperthreshold");
            }
            if (this.backpressureIncomingMqttRequestsLowerThreshold < 0 || this.backpressureIncomingMqttRequestsLowerThreshold >= 100) {
                invalidPropertyValueDetected = true;
                logger.error("Invalid value {} for {}, expecting percentage value in range [0,100)", (Object)this.backpressureIncomingMqttRequestsLowerThreshold, (Object)"joynr.messaging.backpressure.incomingmqttrequests.lowerthreshold");
            }
            if (this.backpressureIncomingMqttRequestsLowerThreshold >= this.backpressureIncomingMqttRequestsUpperThreshold) {
                invalidPropertyValueDetected = true;
                logger.error("Lower threshold percentage {} must be stricly below the upper threshold percentage {}. Change the value of {} or {}", new Object[]{this.backpressureIncomingMqttRequestsLowerThreshold, this.backpressureIncomingMqttRequestsUpperThreshold, "joynr.messaging.backpressure.incomingmqttrequests.lowerthreshold", "joynr.messaging.backpressure.incomingmqttrequests.upperthreshold"});
            }
            if (invalidPropertyValueDetected) {
                this.backpressureEnabled = false;
                String disablingBackpressureMessage = "Disabling backpressure mechanism because of invalid property settings";
                logger.error(disablingBackpressureMessage);
                throw new IllegalArgumentException(disablingBackpressureMessage);
            }
        }
    }

    @Override
    protected void subscribe() {
        this.subscribeToReplyTopic();
        this.subscribeToSharedTopic();
    }

    protected void subscribeToReplyTopic() {
        String topic = this.replyToTopic + "/#";
        logger.info("Subscribing to reply-to topic: {}", (Object)topic);
        this.getClient().subscribe(topic);
    }

    protected void subscribeToSharedTopic() {
        logger.info("Subscribing to shared topic: {}", (Object)this.sharedSubscriptionsTopic);
        this.getClient().subscribe(this.sharedSubscriptionsTopic);
        this.subscribedToSharedSubscriptionsTopic.set(true);
    }

    @Override
    protected void requestAccepted(String messageId) {
        super.requestAccepted(messageId);
        if (this.backpressureEnabled && this.getCurrentCountOfUnprocessedMqttRequests() >= this.unsubscribeThreshold && this.subscribedToSharedSubscriptionsTopic.compareAndSet(true, false)) {
            this.getClient().unsubscribe(this.sharedSubscriptionsTopic);
            logger.info("Unsubscribed from topic {} due to enabled backpressure mechanism and passed upper threshold of unprocessed MQTT requests", (Object)this.sharedSubscriptionsTopic);
        }
    }

    @Override
    protected void requestProcessed(String messageId) {
        super.requestProcessed(messageId);
        if (this.backpressureEnabled && this.getCurrentCountOfUnprocessedMqttRequests() < this.resubscribeThreshold && this.subscribedToSharedSubscriptionsTopic.compareAndSet(false, true)) {
            this.getClient().subscribe(this.sharedSubscriptionsTopic);
            logger.info("Subscribed again to topic {} due to enabled backpressure mechanism and passed lower threshold of unprocessed MQTT requests", (Object)this.sharedSubscriptionsTopic);
        }
    }

    private String createSharedSubscriptionsTopic() {
        StringBuilder sb = new StringBuilder("$share/");
        sb.append(this.sanitiseChannelIdForUseAsTopic());
        sb.append("/");
        sb.append(this.getOwnTopic());
        sb.append("/#");
        return sb.toString();
    }

    private String sanitiseChannelIdForUseAsTopic() {
        String result = this.channelId.replaceAll(NON_ALPHA_REGEX_PATTERN, "");
        if (result.isEmpty()) {
            throw new IllegalArgumentException(String.format("The channel ID %s cannot be converted to a valid MQTT topic fragment because it does not contain any alpha characters.", this.channelId));
        }
        return result;
    }
}

