/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.messaging.mqtt;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import io.joynr.messaging.mqtt.MqttMessagingSkeleton;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class MqttMessageInProgressObserver {
    private static final Logger logger = LoggerFactory.getLogger(MqttMessageInProgressObserver.class);
    private static final String disablingBackpressureMessage = "Disabling backpressure mechanism because of invalid property settings";
    private int currentMessagesInProgress = 0;
    private List<MqttMessagingSkeleton> mqttMessagingSkeletons = new ArrayList<MqttMessagingSkeleton>();
    private HashSet<String> messagesInProgress = new HashSet();
    private final int maxIncomingMqttRequests;
    private final int reEnableMessageAcknowledgementThreshold;
    private final int backpressureEnablingThreshold;
    private boolean backpressureActive = false;
    private final boolean backpressureEnabled;

    @Inject
    public MqttMessageInProgressObserver(@Named(value="joynr.messaging.backpressure.enabled") boolean backpressureEnabled, @Named(value="joynr.messaging.maxincomingmqttrequests") int maxIncomingMqttRequests, @Named(value="joynr.messaging.backpressure.incomingmqttrequests.lowerthreshold") int reEnableMessageAcknowledgementThreshold, @Named(value="joynr.messaging.mqtt.receivemaximum") int receiveMaximum, @Named(value="joynr.internal.messaging.gbidArray") String[] gbids, @Named(value="joynr.messaging.separatereplyreceiver") boolean separateReplyReceiver) {
        this.backpressureEnabled = backpressureEnabled;
        this.maxIncomingMqttRequests = maxIncomingMqttRequests;
        this.reEnableMessageAcknowledgementThreshold = reEnableMessageAcknowledgementThreshold;
        this.backpressureEnablingThreshold = maxIncomingMqttRequests - gbids.length * receiveMaximum;
        if (backpressureEnabled && !separateReplyReceiver) {
            logger.warn("Backpressure is enabled without a separate MQTT connection to receive reply messages. When backpressure is active on high load, reply messages might be held up as well.");
        }
        this.validateBackpressureValues();
    }

    private void validateBackpressureValues() {
        if (this.backpressureEnabled) {
            if (this.maxIncomingMqttRequests <= 0) {
                logger.error("Invalid value {} for {}, expecting a limit greater than 0 when backpressure is activated", (Object)this.maxIncomingMqttRequests, (Object)"joynr.messaging.maxincomingmqttrequests");
                throw new IllegalArgumentException(disablingBackpressureMessage);
            }
            if (this.backpressureEnablingThreshold <= 0) {
                logger.error("{} ({}) is less than {} ({} * number of configured backends/brokers)", new Object[]{this.maxIncomingMqttRequests, "joynr.messaging.maxincomingmqttrequests", this.maxIncomingMqttRequests - this.backpressureEnablingThreshold, "joynr.messaging.mqtt.receivemaximum"});
                throw new IllegalArgumentException(disablingBackpressureMessage);
            }
            if (this.reEnableMessageAcknowledgementThreshold < 0 || this.reEnableMessageAcknowledgementThreshold >= this.backpressureEnablingThreshold) {
                logger.error("Invalid value {} for {}, value has to be greater than 0 and less than {} ({} - ({} * number of configured backends/brokers)", new Object[]{this.reEnableMessageAcknowledgementThreshold, "joynr.messaging.backpressure.incomingmqttrequests.lowerthreshold", this.backpressureEnablingThreshold, "joynr.messaging.maxincomingmqttrequests", "joynr.messaging.mqtt.receivemaximum"});
                throw new IllegalArgumentException(disablingBackpressureMessage);
            }
        }
    }

    public void registerMessagingSkeleton(MqttMessagingSkeleton skeleton) {
        this.mqttMessagingSkeletons.add(skeleton);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean canMessageBeAcknowledged(String messageId) {
        if (!this.backpressureEnabled) {
            return true;
        }
        HashSet<String> hashSet = this.messagesInProgress;
        synchronized (hashSet) {
            if (!this.messagesInProgress.add(messageId)) {
                logger.error("Could not add message with {} to messages in progress.", (Object)messageId);
                return true;
            }
            ++this.currentMessagesInProgress;
            if (this.backpressureActive) {
                return false;
            }
            if (this.currentMessagesInProgress <= this.backpressureEnablingThreshold) {
                return true;
            }
            logger.warn("Backpressure mode entered. Incoming MQTT requests will no longer be acknowledged.");
            this.backpressureActive = true;
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void decrementMessagesInProgress(String messageId) {
        if (!this.backpressureEnabled) {
            return;
        }
        HashSet<String> hashSet = this.messagesInProgress;
        synchronized (hashSet) {
            if (this.messagesInProgress.remove(messageId)) {
                --this.currentMessagesInProgress;
                if (this.backpressureActive && this.currentMessagesInProgress <= this.reEnableMessageAcknowledgementThreshold) {
                    this.backpressureActive = false;
                    logger.warn("Backpressure mode exited. Acknowledging all outstanding MQTT requests.");
                    for (MqttMessagingSkeleton skeleton : this.mqttMessagingSkeletons) {
                        skeleton.acknowledgeOutstandingPublishes();
                    }
                }
            }
        }
    }
}

