/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.Map;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.shade.com.google.common.collect.ArrayListMultimap;
import org.apache.pulsar.shade.com.google.common.collect.Multimap;
import org.apache.pulsar.shade.org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.shade.org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OverloadShedder
implements LoadSheddingStrategy {
    private static final Logger log = LoggerFactory.getLogger(OverloadShedder.class);
    private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
    private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;

    @Override
    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) {
        this.selectedBundlesCache.clear();
        double overloadThreshold = (double)conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
        Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
        loadData.getBrokerData().forEach((broker, brokerData) -> {
            LocalBrokerData localData = brokerData.getLocalData();
            double currentUsage = localData.getMaxResourceUsage();
            if (currentUsage < overloadThreshold) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Broker is not overloaded, ignoring at this point ({})", broker, (Object)localData.printResourceUsage());
                }
                return;
            }
            double percentOfTrafficToOffload = currentUsage - overloadThreshold + 0.05;
            double brokerCurrentThroughput = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
            double minimumThroughputToOffload = brokerCurrentThroughput * percentOfTrafficToOffload;
            log.info("Attempting to shed load on {}, which has resource usage {}% above threshold {}% -- Offloading at least {} MByte/s of traffic ({})", new Object[]{broker, 100.0 * currentUsage, 100.0 * overloadThreshold, minimumThroughputToOffload / 1024.0 / 1024.0, localData.printResourceUsage()});
            MutableDouble trafficMarkedToOffload = new MutableDouble(0.0);
            MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
            if (localData.getBundles().size() > 1) {
                loadData.getBundleDataForLoadShedding().entrySet().stream().filter(e -> localData.getBundles().contains(e.getKey())).map(e -> {
                    String bundle = (String)e.getKey();
                    BundleData bundleData = (BundleData)e.getValue();
                    TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                    double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
                    return Pair.of(bundle, throughput);
                }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft())).filter(e -> localData.getBundles().contains(e.getLeft())).sorted((e1, e2) -> Double.compare((Double)e2.getRight(), (Double)e1.getRight())).forEach(e -> {
                    if (trafficMarkedToOffload.doubleValue() < minimumThroughputToOffload || atLeastOneBundleSelected.isFalse()) {
                        this.selectedBundlesCache.put((String)broker, (String)e.getLeft());
                        trafficMarkedToOffload.add((Number)e.getRight());
                        atLeastOneBundleSelected.setTrue();
                    }
                });
            } else if (localData.getBundles().size() == 1) {
                log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. No Load Shedding will be done on this broker", (Object)localData.getBundles().iterator().next(), broker);
            } else {
                log.warn("Broker {} is overloaded despite having no bundles", broker);
            }
        });
        return this.selectedBundlesCache;
    }
}

