/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UniformLoadShedder
implements LoadSheddingStrategy {
    private static final Logger log = LoggerFactory.getLogger(UniformLoadShedder.class);
    private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
    private static final double EPS = 1.0E-6;

    @Override
    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) {
        boolean isMsgThroughputThresholdExceeded;
        this.selectedBundlesCache.clear();
        Map<String, BrokerData> brokersData = loadData.getBrokerData();
        Map<String, BundleData> loadBundleData = loadData.getBundleDataForLoadShedding();
        Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
        MutableObject msgRateOverloadedBroker = new MutableObject();
        MutableObject msgThroughputOverloadedBroker = new MutableObject();
        MutableObject msgRateUnderloadedBroker = new MutableObject();
        MutableObject msgThroughputUnderloadedBroker = new MutableObject();
        MutableDouble maxMsgRate = new MutableDouble(-1.0);
        MutableDouble maxThroughput = new MutableDouble(-1.0);
        MutableDouble minMsgRate = new MutableDouble(2.147483647E9);
        MutableDouble minThroughput = new MutableDouble(2.147483647E9);
        brokersData.forEach((broker, data) -> {
            double msgRate = data.getLocalData().getMsgRateIn() + data.getLocalData().getMsgRateOut();
            double throughputRate = data.getLocalData().getMsgThroughputIn() + data.getLocalData().getMsgThroughputOut();
            if (msgRate > maxMsgRate.getValue()) {
                msgRateOverloadedBroker.setValue(broker);
                maxMsgRate.setValue(msgRate);
            }
            if (throughputRate > maxThroughput.getValue()) {
                msgThroughputOverloadedBroker.setValue(broker);
                maxThroughput.setValue(throughputRate);
            }
            if (msgRate < minMsgRate.getValue()) {
                msgRateUnderloadedBroker.setValue(broker);
                minMsgRate.setValue(msgRate);
            }
            if (throughputRate < minThroughput.getValue()) {
                msgThroughputUnderloadedBroker.setValue(broker);
                minThroughput.setValue(throughputRate);
            }
        });
        if (minMsgRate.getValue() <= 1.0E-6 && minMsgRate.getValue() >= -1.0E-6) {
            minMsgRate.setValue(1.0);
        }
        if (minThroughput.getValue() <= 1.0E-6 && minThroughput.getValue() >= -1.0E-6) {
            minThroughput.setValue(1.0);
        }
        double msgRateDifferencePercentage = (maxMsgRate.getValue() - minMsgRate.getValue()) * 100.0 / minMsgRate.getValue();
        double msgThroughputDifferenceRate = maxThroughput.getValue() / minThroughput.getValue();
        boolean isMsgRateThresholdExceeded = conf.getLoadBalancerMsgRateDifferenceShedderThreshold() > 0.0 && msgRateDifferencePercentage > conf.getLoadBalancerMsgRateDifferenceShedderThreshold();
        boolean bl = isMsgThroughputThresholdExceeded = conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold() > 0.0 && msgThroughputDifferenceRate > conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
        if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {
            MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt((int)((maxMsgRate.getValue() - minMsgRate.getValue()) * conf.getMaxUnloadPercentage()));
            MutableInt msgThroughputRequiredFromUnloadedBundles = new MutableInt((int)((maxThroughput.getValue() - minThroughput.getValue()) * conf.getMaxUnloadPercentage()));
            if (isMsgRateThresholdExceeded) {
                LocalBrokerData overloadedBrokerData;
                if (log.isDebugEnabled()) {
                    log.debug("Found bundles for uniform load balancing. msgRate overloaded broker: {} with msgRate: {}, msgRate underloaded broker: {} with msgRate: {}", new Object[]{msgRateOverloadedBroker.getValue(), maxMsgRate.getValue(), msgRateUnderloadedBroker.getValue(), minMsgRate.getValue()});
                }
                if ((overloadedBrokerData = brokersData.get(msgRateOverloadedBroker.getValue()).getLocalData()).getBundles().size() > 1 && msgRateRequiredFromUnloadedBundles.getValue() >= conf.getMinUnloadMessage()) {
                    loadBundleData.entrySet().stream().filter(e -> overloadedBrokerData.getBundles().contains(e.getKey())).map(e -> {
                        String bundle = (String)e.getKey();
                        TimeAverageMessageData shortTermData = ((BundleData)e.getValue()).getShortTermData();
                        double msgRate = shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
                        return Pair.of((Object)bundle, (Object)msgRate);
                    }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft())).sorted((e1, e2) -> Double.compare((Double)e2.getRight(), (Double)e1.getRight())).forEach(e -> {
                        if (conf.getMaxUnloadBundleNumPerShedding() != -1 && this.selectedBundlesCache.size() >= conf.getMaxUnloadBundleNumPerShedding()) {
                            return;
                        }
                        String bundle = (String)e.getLeft();
                        double bundleMsgRate = (Double)e.getRight();
                        if (bundleMsgRate <= (double)(msgRateRequiredFromUnloadedBundles.getValue() + 1000)) {
                            log.info("Found bundle to unload with msgRate {}", (Object)bundleMsgRate);
                            msgRateRequiredFromUnloadedBundles.add((Number)(-bundleMsgRate));
                            this.selectedBundlesCache.put((Object)((String)msgRateOverloadedBroker.getValue()), (Object)bundle);
                        }
                    });
                }
            } else {
                LocalBrokerData overloadedBrokerData;
                if (log.isDebugEnabled()) {
                    log.debug("Found bundles for uniform load balancing. msgThroughput overloaded broker: {} with msgThroughput {}, msgThroughput underloaded broker: {} with msgThroughput: {}", new Object[]{msgThroughputOverloadedBroker.getValue(), maxThroughput.getValue(), msgThroughputUnderloadedBroker.getValue(), minThroughput.getValue()});
                }
                if ((overloadedBrokerData = brokersData.get(msgThroughputOverloadedBroker.getValue()).getLocalData()).getBundles().size() > 1 && msgThroughputRequiredFromUnloadedBundles.getValue() >= conf.getMinUnloadMessageThroughput()) {
                    loadBundleData.entrySet().stream().filter(e -> overloadedBrokerData.getBundles().contains(e.getKey())).map(e -> {
                        String bundle = (String)e.getKey();
                        TimeAverageMessageData shortTermData = ((BundleData)e.getValue()).getShortTermData();
                        double msgThroughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
                        return Pair.of((Object)bundle, (Object)msgThroughput);
                    }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft())).sorted((e1, e2) -> Double.compare((Double)e2.getRight(), (Double)e1.getRight())).forEach(e -> {
                        if (conf.getMaxUnloadBundleNumPerShedding() != -1 && this.selectedBundlesCache.size() >= conf.getMaxUnloadBundleNumPerShedding()) {
                            return;
                        }
                        String bundle = (String)e.getLeft();
                        double msgThroughput = (Double)e.getRight();
                        if (msgThroughput <= (double)(msgThroughputRequiredFromUnloadedBundles.getValue() + 1000)) {
                            log.info("Found bundle to unload with msgThroughput {}", (Object)msgThroughput);
                            msgThroughputRequiredFromUnloadedBundles.add((Number)(-msgThroughput));
                            this.selectedBundlesCache.put((Object)((String)msgThroughputOverloadedBroker.getValue()), (Object)bundle);
                        }
                    });
                }
            }
        }
        return this.selectedBundlesCache;
    }
}

