/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.extensions.scheduler;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.NamespaceUnloadStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransferShedder
implements NamespaceUnloadStrategy {
    private static final Logger log = LoggerFactory.getLogger(TransferShedder.class);
    private static final double KB = 1024.0;
    private static final String CANNOT_CONTINUE_UNLOAD_MSG = "Can't continue the unload cycle.";
    private static final String CANNOT_UNLOAD_BROKER_MSG = "Can't unload broker:%s.";
    private static final String CANNOT_UNLOAD_BUNDLE_MSG = "Can't unload bundle:%s.";
    private final LoadStats stats = new LoadStats();
    private PulsarService pulsar;
    private IsolationPoliciesHelper isolationPoliciesHelper;
    private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;
    private List<BrokerFilter> brokerFilterPipeline;
    private Set<UnloadDecision> decisionCache;
    private UnloadCounter counter;
    private ServiceUnitStateChannel channel;
    private int unloadConditionHitCount = 0;

    @VisibleForTesting
    public TransferShedder(UnloadCounter counter) {
        this.pulsar = null;
        this.decisionCache = new HashSet<UnloadDecision>();
        this.counter = counter;
        this.isolationPoliciesHelper = null;
        this.antiAffinityGroupPolicyHelper = null;
    }

    public TransferShedder(PulsarService pulsar, UnloadCounter counter, List<BrokerFilter> brokerFilterPipeline, IsolationPoliciesHelper isolationPoliciesHelper, AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper) {
        this.pulsar = pulsar;
        this.decisionCache = new HashSet<UnloadDecision>();
        this.counter = counter;
        this.isolationPoliciesHelper = isolationPoliciesHelper;
        this.antiAffinityGroupPolicyHelper = antiAffinityGroupPolicyHelper;
        this.channel = ServiceUnitStateChannelImpl.get(pulsar);
        this.brokerFilterPipeline = brokerFilterPipeline;
    }

    @Override
    public void initialize(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.decisionCache = new HashSet<UnloadDecision>();
        ExtensibleLoadManagerImpl manager = ExtensibleLoadManagerImpl.get(pulsar.getLoadManager().get());
        this.counter = manager.getUnloadCounter();
        this.isolationPoliciesHelper = manager.getIsolationPoliciesHelper();
        this.antiAffinityGroupPolicyHelper = manager.getAntiAffinityGroupPolicyHelper();
        this.channel = ServiceUnitStateChannelImpl.get(pulsar);
        this.brokerFilterPipeline = manager.getBrokerFilterPipeline();
    }

    @Override
    public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context, Map<String, Long> recentlyUnloadedBundles, Map<String, Long> recentlyUnloadedBrokers) {
        Map<String, BrokerLookupData> availableBrokers;
        ServiceConfiguration conf = context.brokerConfiguration();
        this.decisionCache.clear();
        this.stats.clear();
        try {
            availableBrokers = context.brokerRegistry().getAvailableBrokerLookupDataAsync().get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            this.counter.update(UnloadDecision.Label.Failure, UnloadDecision.Reason.Unknown);
            log.warn("Failed to fetch available brokers. Stop unloading.", (Throwable)e);
            return this.decisionCache;
        }
        try {
            UnloadDecision.Reason reason;
            LoadDataStore<BrokerLoadData> loadStore = context.brokerLoadDataStore();
            this.stats.setLoadDataStore(loadStore);
            boolean debugMode = ExtensibleLoadManagerImpl.debug(conf, log);
            Optional<UnloadDecision.Reason> skipReason = this.stats.update(context.brokerLoadDataStore(), availableBrokers, recentlyUnloadedBrokers, conf);
            if (skipReason.isPresent()) {
                if (debugMode) {
                    log.warn("Can't continue the unload cycle. Skipped the load stat update. Reason:{}.", (Object)skipReason.get());
                }
                this.counter.update(UnloadDecision.Label.Skip, skipReason.get());
                return this.decisionCache;
            }
            this.counter.updateLoadData(this.stats.avg, this.stats.std);
            if (debugMode) {
                log.info("brokers' load stats:{}", (Object)this.stats);
            }
            int numOfBrokersWithEmptyLoadData = 0;
            int numOfBrokersWithFewBundles = 0;
            double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
            boolean transfer = conf.isLoadBalancerTransferEnabled();
            this.unloadConditionHitCount = this.stats.std() > targetStd || this.isUnderLoaded(context, this.stats.peekMinBroker(), this.stats.avg) || this.isOverLoaded(context, this.stats.peekMaxBroker(), this.stats.avg) ? ++this.unloadConditionHitCount : 0;
            if (this.unloadConditionHitCount <= conf.getLoadBalancerSheddingConditionHitCountThreshold()) {
                if (debugMode) {
                    log.info("Can't continue the unload cycle. Shedding condition hit count:{} is less than or equal to the threshold:{}.", (Object)this.unloadConditionHitCount, (Object)conf.getLoadBalancerSheddingConditionHitCountThreshold());
                }
                this.counter.update(UnloadDecision.Label.Skip, UnloadDecision.Reason.HitCount);
                return this.decisionCache;
            }
            while (true) {
                Iterator<TopBundlesLoadData.BundleLoadData> minBrokerTopBundlesLoadDataIter;
                if (!this.stats.hasTransferableBrokers()) {
                    if (!debugMode) break;
                    log.info("Can't continue the unload cycle. Exhausted target transfer brokers.");
                    break;
                }
                if (this.stats.std() > targetStd) {
                    reason = UnloadDecision.Reason.Overloaded;
                } else if (this.isUnderLoaded(context, this.stats.peekMinBroker(), this.stats.avg)) {
                    reason = UnloadDecision.Reason.Underloaded;
                    if (debugMode) {
                        log.info(String.format("broker:%s is underloaded:%s although load std:%.2f <= targetStd:%.2f. Continuing unload for this underloaded broker.", this.stats.peekMinBroker(), context.brokerLoadDataStore().get(this.stats.peekMinBroker()).get(), this.stats.std(), targetStd));
                    }
                } else if (this.isOverLoaded(context, this.stats.peekMaxBroker(), this.stats.avg)) {
                    reason = UnloadDecision.Reason.Overloaded;
                    if (debugMode) {
                        log.info(String.format("broker:%s is overloaded:%s although load std:%.2f <= targetStd:%.2f. Continuing unload for this overloaded broker.", this.stats.peekMaxBroker(), context.brokerLoadDataStore().get(this.stats.peekMaxBroker()).get(), this.stats.std(), targetStd));
                    }
                } else {
                    if (!debugMode) break;
                    log.info("Can't continue the unload cycle.The overall cluster load meets the target, std:{} <= targetStd:{}.minBroker:{} is not underloaded. maxBroker:{} is not overloaded.", new Object[]{this.stats.std(), targetStd, this.stats.peekMinBroker(), this.stats.peekMaxBroker()});
                    break;
                }
                String maxBroker = this.stats.pollMaxBroker();
                String minBroker = this.stats.peekMinBroker();
                Optional<BrokerLoadData> maxBrokerLoadData = context.brokerLoadDataStore().get(maxBroker);
                Optional<BrokerLoadData> minBrokerLoadData = context.brokerLoadDataStore().get(minBroker);
                if (maxBrokerLoadData.isEmpty()) {
                    log.error(String.format("Can't unload broker:%s. MaxBrokerLoadData is empty.", maxBroker));
                    ++numOfBrokersWithEmptyLoadData;
                    continue;
                }
                if (minBrokerLoadData.isEmpty()) {
                    log.error("Can't transfer load to broker:{}. MinBrokerLoadData is empty.", (Object)minBroker);
                    ++numOfBrokersWithEmptyLoadData;
                    continue;
                }
                double maxLoad = maxBrokerLoadData.get().getWeightedMaxEMA();
                double minLoad = minBrokerLoadData.get().getWeightedMaxEMA();
                double offload = (maxLoad - minLoad) / 2.0;
                BrokerLoadData brokerLoadData = maxBrokerLoadData.get();
                double maxBrokerThroughput = brokerLoadData.getMsgThroughputIn() + brokerLoadData.getMsgThroughputOut();
                double minBrokerThroughput = minBrokerLoadData.get().getMsgThroughputIn() + minBrokerLoadData.get().getMsgThroughputOut();
                double offloadThroughput = maxBrokerThroughput * offload / maxLoad;
                if (debugMode) {
                    log.info(String.format("Attempting to shed load from broker:%s%s, which has the max resource usage:%.2f%%, targetStd:%.2f, -- Trying to offload %.2f%%, %.2f KByte/s of traffic.", maxBroker, transfer ? " to broker:" + minBroker : "", maxLoad * 100.0, targetStd, offload * 100.0, offloadThroughput / 1024.0));
                }
                double trafficMarkedToOffload = 0.0;
                double trafficMarkedToGain = 0.0;
                Optional<TopBundlesLoadData> bundlesLoadData = context.topBundleLoadDataStore().get(maxBroker);
                if (bundlesLoadData.isEmpty() || bundlesLoadData.get().getTopBundlesLoadData().isEmpty()) {
                    log.error(String.format("Can't unload broker:%s. TopBundlesLoadData is empty.", maxBroker));
                    ++numOfBrokersWithEmptyLoadData;
                    continue;
                }
                List<TopBundlesLoadData.BundleLoadData> maxBrokerTopBundlesLoadData = bundlesLoadData.get().getTopBundlesLoadData();
                if (maxBrokerTopBundlesLoadData.size() == 1) {
                    ++numOfBrokersWithFewBundles;
                    log.warn(String.format("Can't unload broker:%s. Sole namespace bundle:%s is overloading the broker. ", maxBroker, maxBrokerTopBundlesLoadData.iterator().next()));
                    continue;
                }
                Optional<TopBundlesLoadData> minBundlesLoadData = context.topBundleLoadDataStore().get(minBroker);
                Iterator<TopBundlesLoadData.BundleLoadData> iterator = minBrokerTopBundlesLoadDataIter = minBundlesLoadData.isPresent() ? minBundlesLoadData.get().getTopBundlesLoadData().iterator() : null;
                if (maxBrokerTopBundlesLoadData.isEmpty()) {
                    ++numOfBrokersWithFewBundles;
                    log.warn(String.format("Can't unload broker:%s. Broker overloaded despite having no bundles", maxBroker));
                    continue;
                }
                int remainingTopBundles = maxBrokerTopBundlesLoadData.size();
                for (TopBundlesLoadData.BundleLoadData e : maxBrokerTopBundlesLoadData) {
                    Unload unload;
                    String bundle = e.bundleName();
                    if (this.channel != null && !this.channel.isOwner(bundle, maxBroker)) {
                        if (!debugMode) continue;
                        log.warn(String.format("Can't unload bundle:%s. MaxBroker:%s is not the owner.", bundle, maxBroker));
                        continue;
                    }
                    if (recentlyUnloadedBundles.containsKey(bundle)) {
                        if (!debugMode) continue;
                        log.info(String.format("Can't unload bundle:%s. Bundle has been recently unloaded at ts:%d.", bundle, recentlyUnloadedBundles.get(bundle)));
                        continue;
                    }
                    if (!this.isTransferable(context, availableBrokers, bundle, maxBroker, Optional.of(minBroker))) {
                        if (!debugMode) continue;
                        log.info(String.format("Can't unload bundle:%s. This unload can't meet affinity(isolation) or anti-affinity group policies.", bundle));
                        continue;
                    }
                    if (remainingTopBundles <= 1) {
                        if (!debugMode) break;
                        log.info(String.format("Can't unload bundle:%s. The remaining bundles in TopBundlesLoadData from the maxBroker:%s is less than or equal to 1.", bundle, maxBroker));
                        break;
                    }
                    NamespaceBundleStats bundleData = e.stats();
                    double maxBrokerBundleThroughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
                    boolean swap = false;
                    ArrayList<Unload> minToMaxUnloads = new ArrayList<Unload>();
                    double minBrokerBundleSwapThroughput = 0.0;
                    if (trafficMarkedToOffload - trafficMarkedToGain + maxBrokerBundleThroughput > offloadThroughput) {
                        if (transfer && minBrokerTopBundlesLoadDataIter != null) {
                            double maxBrokerNewThroughput = maxBrokerThroughput - trafficMarkedToOffload + trafficMarkedToGain - maxBrokerBundleThroughput;
                            double minBrokerNewThroughput = minBrokerThroughput + trafficMarkedToOffload - trafficMarkedToGain + maxBrokerBundleThroughput;
                            while (minBrokerTopBundlesLoadDataIter.hasNext()) {
                                TopBundlesLoadData.BundleLoadData minBrokerBundleData = minBrokerTopBundlesLoadDataIter.next();
                                if (!this.isTransferable(context, availableBrokers, minBrokerBundleData.bundleName(), minBroker, Optional.of(maxBroker))) continue;
                                double minBrokerBundleThroughput = minBrokerBundleData.stats().msgThroughputIn + minBrokerBundleData.stats().msgThroughputOut;
                                double maxBrokerNewThroughputTmp = maxBrokerNewThroughput + minBrokerBundleThroughput;
                                double minBrokerNewThroughputTmp = minBrokerNewThroughput - minBrokerBundleThroughput;
                                if (!(maxBrokerNewThroughputTmp < maxBrokerThroughput) || !(minBrokerNewThroughputTmp < maxBrokerThroughput)) continue;
                                minToMaxUnloads.add(new Unload(minBroker, minBrokerBundleData.bundleName(), Optional.of(maxBroker)));
                                maxBrokerNewThroughput = maxBrokerNewThroughputTmp;
                                minBrokerNewThroughput = minBrokerNewThroughputTmp;
                                minBrokerBundleSwapThroughput += minBrokerBundleThroughput;
                                if (!(minBrokerNewThroughput <= maxBrokerNewThroughput) || !(maxBrokerNewThroughput < maxBrokerThroughput * 0.75)) continue;
                                swap = true;
                                break;
                            }
                        }
                        if (!swap) {
                            if (!debugMode) break;
                            log.info(String.format("Can't unload bundle:%s. The traffic to unload:%.2f - gain:%.2f = %.2f KByte/s is greater than the target :%.2f KByte/s.", bundle, (trafficMarkedToOffload + maxBrokerBundleThroughput) / 1024.0, trafficMarkedToGain / 1024.0, (trafficMarkedToOffload - trafficMarkedToGain + maxBrokerBundleThroughput) / 1024.0, offloadThroughput / 1024.0));
                            break;
                        }
                    }
                    if (transfer) {
                        if (swap) {
                            minToMaxUnloads.forEach(minToMaxUnload -> {
                                if (debugMode) {
                                    log.info("Decided to gain bundle:{} from min broker:{}", (Object)minToMaxUnload.serviceUnit(), (Object)minToMaxUnload.sourceBroker());
                                }
                                UnloadDecision decision = new UnloadDecision();
                                decision.setUnload((Unload)minToMaxUnload);
                                decision.succeed(reason);
                                this.decisionCache.add(decision);
                            });
                            if (debugMode) {
                                log.info(String.format("Total traffic %.2f KByte/s to transfer from min broker:%s to max broker:%s.", minBrokerBundleSwapThroughput / 1024.0, minBroker, maxBroker));
                                trafficMarkedToGain += minBrokerBundleSwapThroughput;
                            }
                        }
                        unload = new Unload(maxBroker, bundle, Optional.of(minBroker));
                    } else {
                        unload = new Unload(maxBroker, bundle);
                    }
                    UnloadDecision decision = new UnloadDecision();
                    decision.setUnload(unload);
                    decision.succeed(reason);
                    this.decisionCache.add(decision);
                    trafficMarkedToOffload += maxBrokerBundleThroughput;
                    --remainingTopBundles;
                    if (!debugMode) continue;
                    log.info(String.format("Decided to unload bundle:%s, throughput:%.2f KByte/s. The traffic marked to unload:%.2f - gain:%.2f = %.2f KByte/s. Target:%.2f KByte/s.", bundle, maxBrokerBundleThroughput / 1024.0, trafficMarkedToOffload / 1024.0, trafficMarkedToGain / 1024.0, (trafficMarkedToOffload - trafficMarkedToGain) / 1024.0, offloadThroughput / 1024.0));
                }
                if (trafficMarkedToOffload > 0.0) {
                    double adjustedOffload = (trafficMarkedToOffload - trafficMarkedToGain) * maxLoad / maxBrokerThroughput;
                    this.stats.offload(maxLoad, minLoad, adjustedOffload);
                    if (!debugMode) continue;
                    log.info(String.format("brokers' load stats:%s, after offload{max:%.2f, min:%.2f, offload:%.2f}", this.stats, maxLoad, minLoad, adjustedOffload));
                    continue;
                }
                ++numOfBrokersWithFewBundles;
                log.warn(String.format("Can't unload broker:%s. There is no bundle that can be unloaded in top bundles load data. Consider splitting bundles owned by the broker to make each bundle serve less traffic or increasing loadBalancerMaxNumberOfBundlesInBundleLoadReport to report more bundles in the top bundles load data.", maxBroker));
            }
            if (debugMode) {
                log.info("decisionCache:{}", this.decisionCache);
            }
            if (this.decisionCache.isEmpty()) {
                reason = numOfBrokersWithEmptyLoadData > 0 ? UnloadDecision.Reason.NoLoadData : (numOfBrokersWithFewBundles > 0 ? UnloadDecision.Reason.NoBundles : UnloadDecision.Reason.HitCount);
                this.counter.update(UnloadDecision.Label.Skip, reason);
            } else {
                this.unloadConditionHitCount = 0;
            }
        }
        catch (Throwable e) {
            log.error("Failed to process unloading. ", e);
            this.counter.update(UnloadDecision.Label.Failure, UnloadDecision.Reason.Unknown);
        }
        return this.decisionCache;
    }

    private boolean isUnderLoaded(LoadManagerContext context, String broker, double avgLoad) {
        Optional<BrokerLoadData> brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
        if (brokerLoadDataOptional.isEmpty()) {
            return false;
        }
        BrokerLoadData brokerLoadData = brokerLoadDataOptional.get();
        if (brokerLoadData.getMsgThroughputEMA() < 1.0) {
            return true;
        }
        return brokerLoadData.getWeightedMaxEMA() < avgLoad * Math.min(0.5, Math.max(0.0, context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2.0));
    }

    private boolean isOverLoaded(LoadManagerContext context, String broker, double avgLoad) {
        Optional<BrokerLoadData> brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
        if (brokerLoadDataOptional.isEmpty()) {
            return false;
        }
        ServiceConfiguration conf = context.brokerConfiguration();
        double overloadThreshold = (double)conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
        double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
        BrokerLoadData brokerLoadData = brokerLoadDataOptional.get();
        double load = brokerLoadData.getWeightedMaxEMA();
        return load > overloadThreshold && load > avgLoad + targetStd;
    }

    private boolean isTransferable(LoadManagerContext context, Map<String, BrokerLookupData> availableBrokers, String bundle, String srcBroker, Optional<String> dstBroker) {
        if (this.pulsar == null) {
            return true;
        }
        String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
        String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
        NamespaceBundle namespaceBundle = this.pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundleRange);
        if (!this.isLoadBalancerSheddingBundlesWithPoliciesEnabled(context, namespaceBundle)) {
            return false;
        }
        HashMap<String, BrokerLookupData> candidates = new HashMap<String, BrokerLookupData>(availableBrokers);
        for (BrokerFilter filter : this.brokerFilterPipeline) {
            try {
                filter.filterAsync(candidates, namespaceBundle, context).get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                log.error("Failed to filter brokers with filter: {}", (Object)filter.getClass().getName(), (Object)e);
                return false;
            }
        }
        if (dstBroker.isPresent() && !candidates.containsKey(dstBroker.get())) {
            return false;
        }
        candidates.remove(srcBroker);
        boolean transfer = context.brokerConfiguration().isLoadBalancerTransferEnabled();
        if (dstBroker.isEmpty() || !transfer) {
            return !candidates.isEmpty();
        }
        return candidates.containsKey(dstBroker.get());
    }

    protected boolean isLoadBalancerSheddingBundlesWithPoliciesEnabled(LoadManagerContext context, NamespaceBundle namespaceBundle) {
        if (this.isolationPoliciesHelper != null && this.isolationPoliciesHelper.hasIsolationPolicy(namespaceBundle.getNamespaceObject())) {
            return context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
        }
        if (this.antiAffinityGroupPolicyHelper != null && this.antiAffinityGroupPolicyHelper.hasAntiAffinityGroupPolicy(namespaceBundle.toString())) {
            return context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
        }
        return true;
    }

    public TransferShedder() {
    }

    public UnloadCounter getCounter() {
        return this.counter;
    }

    static class LoadStats {
        private double sum;
        private double sqSum;
        private int totalBrokers;
        private double avg;
        private double std;
        private LoadDataStore<BrokerLoadData> loadDataStore;
        private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad = new ArrayList<Map.Entry<String, BrokerLoadData>>();
        int maxBrokerIndex;
        int minBrokerIndex;
        int numberOfBrokerSheddingPerCycle;
        int maxNumberOfBrokerSheddingPerCycle;

        LoadStats() {
        }

        private void update(double sum, double sqSum, int totalBrokers) {
            this.sum = sum;
            this.sqSum = sqSum;
            this.totalBrokers = totalBrokers;
            if (totalBrokers == 0) {
                this.avg = 0.0;
                this.std = 0.0;
            } else {
                this.avg = sum / (double)totalBrokers;
                this.std = Math.sqrt(sqSum / (double)totalBrokers - this.avg * this.avg);
            }
        }

        void offload(double max, double min, double offload) {
            this.sqSum -= max * max + min * min;
            double maxd = Math.max(0.0, max - offload);
            double mind = min + offload;
            this.sqSum += maxd * maxd + mind * mind;
            this.std = Math.sqrt(Math.abs(this.sqSum / (double)this.totalBrokers - this.avg * this.avg));
            ++this.numberOfBrokerSheddingPerCycle;
            ++this.minBrokerIndex;
        }

        void clear() {
            this.sum = 0.0;
            this.sqSum = 0.0;
            this.totalBrokers = 0;
            this.avg = 0.0;
            this.std = 0.0;
            this.maxBrokerIndex = 0;
            this.minBrokerIndex = 0;
            this.numberOfBrokerSheddingPerCycle = 0;
            this.maxNumberOfBrokerSheddingPerCycle = 0;
            this.brokersSortedByLoad.clear();
            this.loadDataStore = null;
        }

        Optional<UnloadDecision.Reason> update(LoadDataStore<BrokerLoadData> loadStore, Map<String, BrokerLookupData> availableBrokers, Map<String, Long> recentlyUnloadedBrokers, ServiceConfiguration conf) {
            this.maxNumberOfBrokerSheddingPerCycle = conf.getLoadBalancerMaxNumberOfBrokerSheddingPerCycle();
            boolean debug = ExtensibleLoadManagerImpl.debug(conf, log);
            UnloadDecision.Reason decisionReason = null;
            double sum = 0.0;
            double sqSum = 0.0;
            int totalBrokers = 0;
            long now = System.currentTimeMillis();
            HashSet<String> missingLoadDataBrokers = new HashSet<String>(availableBrokers.keySet());
            for (Map.Entry<String, BrokerLoadData> entry : loadStore.entrySet()) {
                BrokerLoadData localBrokerData = entry.getValue();
                String broker = entry.getKey();
                missingLoadDataBrokers.remove(broker);
                if (now - localBrokerData.getUpdatedAt() > conf.getLoadBalancerBrokerLoadDataTTLInSeconds() * 1000L) {
                    log.warn("Ignoring broker:{} load update because the load data timestamp:{} is too old.", (Object)broker, (Object)localBrokerData.getUpdatedAt());
                    decisionReason = UnloadDecision.Reason.OutDatedData;
                    continue;
                }
                if (recentlyUnloadedBrokers.containsKey(broker)) {
                    long elapsed = localBrokerData.getUpdatedAt() - recentlyUnloadedBrokers.get(broker);
                    if (elapsed < conf.getLoadBalanceSheddingDelayInSeconds() * 1000L) {
                        if (debug) {
                            log.warn("Broker:{} load data is too early since the last transfer. elapsed {} secs < threshold {} secs", new Object[]{broker, TimeUnit.MILLISECONDS.toSeconds(elapsed), conf.getLoadBalanceSheddingDelayInSeconds()});
                        }
                        this.update(0.0, 0.0, 0);
                        return Optional.of(UnloadDecision.Reason.CoolDown);
                    }
                    recentlyUnloadedBrokers.remove(broker);
                }
                double load = localBrokerData.getWeightedMaxEMA();
                sum += load;
                sqSum += load * load;
                ++totalBrokers;
            }
            if (totalBrokers == 0) {
                if (decisionReason == null) {
                    decisionReason = UnloadDecision.Reason.NoBrokers;
                }
                this.update(0.0, 0.0, 0);
                if (debug) {
                    log.info("There is no broker load data.");
                }
                return Optional.of(decisionReason);
            }
            if (!missingLoadDataBrokers.isEmpty()) {
                decisionReason = UnloadDecision.Reason.NoLoadData;
                this.update(0.0, 0.0, 0);
                if (debug) {
                    log.info("There is missing load data from brokers:{}", missingLoadDataBrokers);
                }
                return Optional.of(decisionReason);
            }
            this.update(sum, sqSum, totalBrokers);
            return Optional.empty();
        }

        void setLoadDataStore(LoadDataStore<BrokerLoadData> loadDataStore) {
            this.loadDataStore = loadDataStore;
            this.brokersSortedByLoad.addAll(loadDataStore.entrySet());
            this.brokersSortedByLoad.sort(Comparator.comparingDouble(a -> ((BrokerLoadData)a.getValue()).getWeightedMaxEMA()));
            this.maxBrokerIndex = this.brokersSortedByLoad.size() - 1;
            this.minBrokerIndex = 0;
        }

        String peekMinBroker() {
            return this.brokersSortedByLoad.get(this.minBrokerIndex).getKey();
        }

        String peekMaxBroker() {
            return this.brokersSortedByLoad.get(this.maxBrokerIndex).getKey();
        }

        String pollMaxBroker() {
            return this.brokersSortedByLoad.get(this.maxBrokerIndex--).getKey();
        }

        public String toString() {
            return String.format("sum:%.2f, sqSum:%.2f, avg:%.2f, std:%.2f, totalBrokers:%d, brokersSortedByLoad:%s", this.sum, this.sqSum, this.avg, this.std, this.totalBrokers, this.brokersSortedByLoad.stream().map(v -> (String)v.getKey()).collect(Collectors.toList()));
        }

        boolean hasTransferableBrokers() {
            return this.numberOfBrokerSheddingPerCycle < this.maxNumberOfBrokerSheddingPerCycle && this.minBrokerIndex < this.maxBrokerIndex;
        }

        public double sum() {
            return this.sum;
        }

        public double sqSum() {
            return this.sqSum;
        }

        public int totalBrokers() {
            return this.totalBrokers;
        }

        public double avg() {
            return this.avg;
        }

        public double std() {
            return this.std;
        }

        public LoadDataStore<BrokerLoadData> loadDataStore() {
            return this.loadDataStore;
        }

        public List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad() {
            return this.brokersSortedByLoad;
        }

        public int maxBrokerIndex() {
            return this.maxBrokerIndex;
        }

        public int minBrokerIndex() {
            return this.minBrokerIndex;
        }

        public int numberOfBrokerSheddingPerCycle() {
            return this.numberOfBrokerSheddingPerCycle;
        }

        public int maxNumberOfBrokerSheddingPerCycle() {
            return this.maxNumberOfBrokerSheddingPerCycle;
        }
    }
}

