/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BundleSplitterTask
implements BundleSplitStrategy {
    private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class);
    private final Set<String> bundleCache = new HashSet<String>();
    private final Map<String, Integer> namespaceBundleCount = new HashMap<String, Integer>();

    @Override
    public Set<String> findBundlesToSplit(LoadData loadData, PulsarService pulsar) {
        this.bundleCache.clear();
        this.namespaceBundleCount.clear();
        ServiceConfiguration conf = pulsar.getConfiguration();
        int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
        long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
        long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
        long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
        long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 0x100000;
        loadData.getBrokerData().forEach((broker, brokerData) -> {
            LocalBrokerData localData = brokerData.getLocalData();
            for (Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
                String bundle = entry.getKey();
                NamespaceBundleStats stats = entry.getValue();
                if (stats.topics < 2L) {
                    log.info("The count of topics on the bundle {} is less than 2\uff0cskip split!", (Object)bundle);
                    continue;
                }
                double totalMessageRate = 0.0;
                double totalMessageThroughput = 0.0;
                if (loadData.getBundleData().containsKey(bundle)) {
                    TimeAverageMessageData longTermData = loadData.getBundleData().get(bundle).getLongTermData();
                    totalMessageRate = longTermData.totalMsgRate();
                    totalMessageThroughput = longTermData.totalMsgThroughput();
                }
                if (stats.topics <= maxBundleTopics && (long)(stats.consumerCount + stats.producerCount) <= maxBundleSessions && !(totalMessageRate > (double)maxBundleMsgRate) && !(totalMessageThroughput > (double)maxBundleBandwidth)) continue;
                String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
                try {
                    int bundleCount = pulsar.getNamespaceService().getBundleCount(NamespaceName.get(namespace));
                    if (bundleCount + this.namespaceBundleCount.getOrDefault(namespace, 0) < maxBundleCount) {
                        log.info("The bundle {} is considered to be unload. Topics: {}/{}, Sessions: ({}+{})/{}, Message Rate: {}/{} (msgs/s), Message Throughput: {}/{} (MB/s)", new Object[]{bundle, stats.topics, maxBundleTopics, stats.producerCount, stats.consumerCount, maxBundleSessions, totalMessageRate, maxBundleMsgRate, totalMessageThroughput / 1048576.0, maxBundleBandwidth / 0x100000L});
                        this.bundleCache.add(bundle);
                        int bundleNum = this.namespaceBundleCount.getOrDefault(namespace, 0);
                        this.namespaceBundleCount.put(namespace, bundleNum + 1);
                        continue;
                    }
                    log.warn("Could not split namespace bundle {} because namespace {} has too many bundles: {}", new Object[]{bundle, namespace, bundleCount});
                }
                catch (Exception e) {
                    log.warn("Error while getting bundle count for namespace {}", (Object)namespace, (Object)e);
                }
            }
        });
        return this.bundleCache;
    }
}

