/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.resourcegroup;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupConfigInfo;
import org.apache.pulsar.broker.resourcegroup.ResourceQuotaCalculator;
import org.apache.pulsar.broker.resourcegroup.ResourceQuotaCalculatorImpl;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageConsumer;
import org.apache.pulsar.broker.resourcegroup.ResourceUsagePublisher;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceGroupService {
    private static final Logger log = LoggerFactory.getLogger(ResourceGroupService.class);
    private final PulsarService pulsar;
    protected final ResourceQuotaCalculator quotaCalculator;
    private ResourceUsageTransportManager resourceUsageTransportMgr;
    private ConcurrentHashMap<String, ResourceGroup> resourceGroupsMap = new ConcurrentHashMap();
    private ConcurrentHashMap<String, ResourceGroup> tenantToRGsMap = new ConcurrentHashMap();
    private ConcurrentHashMap<String, ResourceGroup> namespaceToRGsMap = new ConcurrentHashMap();
    private ConcurrentHashMap<String, ResourceGroup.BytesAndMessagesCount> topicProduceStats = new ConcurrentHashMap();
    private ConcurrentHashMap<String, ResourceGroup.BytesAndMessagesCount> topicConsumeStats = new ConcurrentHashMap();
    private ScheduledFuture<?> aggreagteLocalUsagePeriodicTask;
    private long aggregateLocalUsagePeriodInSeconds;
    private ScheduledFuture<?> calculateQuotaPeriodicTask;
    private long resourceUsagePublishPeriodInSeconds;
    private TimeUnit timeUnitScale;
    protected static final int MaxUsageReportSuppressRounds = 5;
    protected static long maxIntervalForSuppressingReportsMSecs;
    protected static final float UsageReportSuppressionTolerancePercentage = 5.0f;

    public ResourceGroupService(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.timeUnitScale = TimeUnit.SECONDS;
        this.quotaCalculator = new ResourceQuotaCalculatorImpl();
        this.resourceUsageTransportMgr = pulsar.getResourceUsageTransportManager();
        this.initialize();
    }

    public ResourceGroupService(PulsarService pulsar, TimeUnit timescale, ResourceUsageTransportManager transportMgr, ResourceQuotaCalculator quotaCalc) {
        this.pulsar = pulsar;
        this.timeUnitScale = timescale;
        this.resourceUsageTransportMgr = transportMgr;
        this.quotaCalculator = quotaCalc;
        this.initialize();
    }

    public void resourceGroupCreate(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
        this.checkRGCreateParams(rgConfig);
        ResourceGroup rg = new ResourceGroup(this, rgConfig);
        this.resourceGroupsMap.put(rgConfig.getName(), rg);
    }

    public void resourceGroupCreate(ResourceGroupConfigInfo rgConfig, ResourceUsagePublisher rgPublisher, ResourceUsageConsumer rgConsumer) throws PulsarAdminException {
        this.checkRGCreateParams(rgConfig);
        ResourceGroup rg = new ResourceGroup(this, rgConfig, rgPublisher, rgConsumer);
        this.resourceGroupsMap.put(rgConfig.getName(), rg);
    }

    public ResourceGroup resourceGroupGet(String resourceGroupName) {
        ResourceGroup retrievedRG = this.getResourceGroupInternal(resourceGroupName);
        if (retrievedRG == null) {
            return null;
        }
        return new ResourceGroup(retrievedRG);
    }

    public void resourceGroupUpdate(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
        if (rgConfig == null) {
            throw new IllegalArgumentException("ResourceGroupUpdate: Invalid null ResourceGroup config");
        }
        ResourceGroup rg = this.getResourceGroupInternal(rgConfig.getName());
        if (rg == null) {
            throw new PulsarAdminException("Resource group does not exist: " + rgConfig.getName());
        }
        rg.updateResourceGroup(rgConfig);
    }

    public void resourceGroupDelete(String name) throws PulsarAdminException {
        long nsRefCount;
        ResourceGroup rg = this.getResourceGroupInternal(name);
        if (rg == null) {
            throw new PulsarAdminException("Resource group does not exist: " + name);
        }
        long tenantRefCount = rg.getResourceGroupNumOfTenantRefs();
        if (tenantRefCount + (nsRefCount = rg.getResourceGroupNumOfNSRefs()) > 0L) {
            String errMesg = "Resource group " + name + " still has " + tenantRefCount + " tenant refs";
            errMesg = errMesg + " and " + nsRefCount + " namespace refs on it";
            throw new PulsarAdminException(errMesg);
        }
        this.resourceGroupsMap.remove(name);
    }

    protected long getNumResourceGroups() {
        return this.resourceGroupsMap.mappingCount();
    }

    public void registerTenant(String resourceGroupName, String tenantName) throws PulsarAdminException {
        ResourceGroup rg = this.checkResourceGroupExists(resourceGroupName);
        ResourceGroup oldRG = this.tenantToRGsMap.get(tenantName);
        if (oldRG != null) {
            String errMesg = "Tenant " + tenantName + " already references a resource group: " + oldRG.getID();
            throw new PulsarAdminException(errMesg);
        }
        ResourceGroupOpStatus status = rg.registerUsage(tenantName, ResourceGroup.ResourceGroupRefTypes.Tenants, true, this.resourceUsageTransportMgr);
        if (status == ResourceGroupOpStatus.Exists) {
            String errMesg = "Tenant " + tenantName + " already references the resource group " + resourceGroupName;
            errMesg = errMesg + "; this is unexpected";
            throw new PulsarAdminException(errMesg);
        }
        this.tenantToRGsMap.put(tenantName, rg);
    }

    public void unRegisterTenant(String resourceGroupName, String tenantName) throws PulsarAdminException {
        ResourceGroup rg = this.checkResourceGroupExists(resourceGroupName);
        ResourceGroupOpStatus status = rg.registerUsage(tenantName, ResourceGroup.ResourceGroupRefTypes.Tenants, false, this.resourceUsageTransportMgr);
        if (status == ResourceGroupOpStatus.DoesNotExist) {
            String errMesg = "Tenant " + tenantName + " does not yet reference resource group " + resourceGroupName;
            throw new PulsarAdminException(errMesg);
        }
        this.tenantToRGsMap.remove(tenantName, rg);
    }

    public void registerNameSpace(String resourceGroupName, String namespaceName) throws PulsarAdminException {
        ResourceGroup rg = this.checkResourceGroupExists(resourceGroupName);
        ResourceGroup oldRG = this.namespaceToRGsMap.get(namespaceName);
        if (oldRG != null) {
            String errMesg = "Namespace " + namespaceName + " already references a resource group: " + oldRG.getID();
            throw new PulsarAdminException(errMesg);
        }
        ResourceGroupOpStatus status = rg.registerUsage(namespaceName, ResourceGroup.ResourceGroupRefTypes.Namespaces, true, this.resourceUsageTransportMgr);
        if (status == ResourceGroupOpStatus.Exists) {
            String errMesg = String.format("Namespace {} already references the target resource group {}", namespaceName, resourceGroupName);
            throw new PulsarAdminException(errMesg);
        }
        this.namespaceToRGsMap.put(namespaceName, rg);
    }

    public void unRegisterNameSpace(String resourceGroupName, String namespaceName) throws PulsarAdminException {
        ResourceGroup rg = this.checkResourceGroupExists(resourceGroupName);
        ResourceGroupOpStatus status = rg.registerUsage(namespaceName, ResourceGroup.ResourceGroupRefTypes.Namespaces, false, this.resourceUsageTransportMgr);
        if (status == ResourceGroupOpStatus.DoesNotExist) {
            String errMesg = String.format("Namespace {} does not yet reference resource group {}", namespaceName, resourceGroupName);
            throw new PulsarAdminException(errMesg);
        }
        this.namespaceToRGsMap.remove(namespaceName, rg);
    }

    public boolean incrementUsage(String tenantName, String nsName, ResourceGroup.ResourceGroupMonitoringClass monClass, ResourceGroup.BytesAndMessagesCount incStats) throws PulsarAdminException {
        ResourceGroup nsRG = this.namespaceToRGsMap.get(nsName);
        ResourceGroup tenantRG = this.tenantToRGsMap.get(tenantName);
        if (tenantRG == null && nsRG == null) {
            return false;
        }
        if (incStats.bytes < 0L || incStats.messages < 0L) {
            String errMesg = String.format("incrementUsage on tenant=%s, NS=%s: bytes (%s) or mesgs (%s) is negative", tenantName, nsName, incStats.bytes, incStats.messages);
            throw new PulsarAdminException(errMesg);
        }
        if (nsRG == tenantRG) {
            nsRG.incrementLocalUsageStats(monClass, incStats);
            return true;
        }
        if (tenantRG != null) {
            tenantRG.incrementLocalUsageStats(monClass, incStats);
        }
        if (nsRG != null) {
            nsRG.incrementLocalUsageStats(monClass, incStats);
        }
        return true;
    }

    protected ResourceGroup.BytesAndMessagesCount getRGUsage(String rgName, ResourceGroup.ResourceGroupMonitoringClass monClass) throws PulsarAdminException {
        ResourceGroup rg = this.getResourceGroupInternal(rgName);
        if (rg != null) {
            return rg.getLocalUsageStats(monClass);
        }
        ResourceGroup.BytesAndMessagesCount retCount = new ResourceGroup.BytesAndMessagesCount();
        retCount.bytes = -1L;
        retCount.messages = -1L;
        return retCount;
    }

    private ResourceGroup getResourceGroupInternal(String resourceGroupName) {
        if (resourceGroupName == null) {
            throw new IllegalArgumentException("Invalid null resource group name: " + resourceGroupName);
        }
        return this.resourceGroupsMap.get(resourceGroupName);
    }

    private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdminException {
        ResourceGroup rg = this.getResourceGroupInternal(rgName);
        if (rg == null) {
            throw new PulsarAdminException("Resource group does not exist: " + rgName);
        }
        return rg;
    }

    private void updateStatsWithDiff(String topicName, String tenantString, String nsString, long accByteCount, long accMesgCount, ResourceGroup.ResourceGroupMonitoringClass monClass) {
        ConcurrentHashMap<String, ResourceGroup.BytesAndMessagesCount> hm;
        switch (monClass) {
            default: {
                log.error("updateStatsWithDiff: Unknown monitoring class={}; ignoring", (Object)monClass);
                return;
            }
            case Publish: {
                hm = this.topicProduceStats;
                break;
            }
            case Dispatch: {
                hm = this.topicConsumeStats;
            }
        }
        ResourceGroup.BytesAndMessagesCount bmDiff = new ResourceGroup.BytesAndMessagesCount();
        ResourceGroup.BytesAndMessagesCount bmNewCount = new ResourceGroup.BytesAndMessagesCount();
        bmNewCount.bytes = accByteCount;
        bmNewCount.messages = accMesgCount;
        ResourceGroup.BytesAndMessagesCount bmOldCount = hm.get(topicName);
        if (bmOldCount == null) {
            bmDiff.bytes = bmNewCount.bytes;
            bmDiff.messages = bmNewCount.messages;
        } else {
            bmDiff.bytes = bmNewCount.bytes - bmOldCount.bytes;
            bmDiff.messages = bmNewCount.messages - bmOldCount.messages;
        }
        if (bmDiff.bytes <= 0L || bmDiff.messages <= 0L) {
            return;
        }
        try {
            boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff);
            log.info("aggregateResourceGroupLocalUsages monclass={} statsUpdated={} for tenant={}, namespace={}; by {} bytes, {} mesgs", new Object[]{monClass, statsUpdated, tenantString, nsString, bmDiff.bytes, bmDiff.messages});
            hm.put(topicName, bmNewCount);
        }
        catch (Throwable t) {
            log.error("aggregateResourceGroupLocalUsages got ex={} while aggregating for {}} side", (Object)t.getMessage(), (Object)monClass);
        }
    }

    protected void aggregateResourceGroupLocalUsages() {
        long mSecsSinceEpochStart = System.currentTimeMillis();
        BrokerService bs = this.pulsar.getBrokerService();
        Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
        for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
            String topicName = entry.getKey();
            TopicStatsImpl topicStats = entry.getValue();
            TopicName topic = TopicName.get((String)topicName);
            String tenantString = topic.getTenant();
            String nsString = topic.getNamespacePortion();
            if (!this.tenantToRGsMap.containsKey(tenantString) && !this.namespaceToRGsMap.containsKey(nsString)) continue;
            this.updateStatsWithDiff(topicName, tenantString, nsString, topicStats.bytesInCounter, topicStats.msgInCounter, ResourceGroup.ResourceGroupMonitoringClass.Publish);
            this.updateStatsWithDiff(topicName, tenantString, nsString, topicStats.bytesOutCounter, topicStats.msgOutCounter, ResourceGroup.ResourceGroupMonitoringClass.Dispatch);
        }
        long mSecsSinceEpochEnd = System.currentTimeMillis();
        long diffMSecs = mSecsSinceEpochEnd - mSecsSinceEpochStart;
        log.debug("aggregateResourceGroupLocalUsages took {} millisecs", (Object)diffMSecs);
        ServiceConfiguration config = this.pulsar.getConfiguration();
        long newPeriodInSeconds = config.getResourceUsageTransportPublishIntervalInSecs();
        if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) {
            if (this.aggreagteLocalUsagePeriodicTask == null) {
                log.error("aggregateResourceGroupLocalUsages: Unable to find running task to cancel when publish period changed from {} to {} {}", new Object[]{this.aggregateLocalUsagePeriodInSeconds, newPeriodInSeconds, this.timeUnitScale});
            } else {
                boolean cancelStatus = this.aggreagteLocalUsagePeriodicTask.cancel(true);
                log.info("aggregateResourceGroupLocalUsages: Got status={} in cancel of periodic when publish period changed from {} to {} {}", new Object[]{cancelStatus, this.aggregateLocalUsagePeriodInSeconds, newPeriodInSeconds, this.timeUnitScale});
            }
            this.aggreagteLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(this::aggregateResourceGroupLocalUsages, newPeriodInSeconds, newPeriodInSeconds, this.timeUnitScale);
            this.aggregateLocalUsagePeriodInSeconds = newPeriodInSeconds;
        }
    }

    private void calculateQuotaForAllResourceGroups() {
        long mSecsSinceEpochStart = System.currentTimeMillis();
        ResourceGroup.BytesAndMessagesCount updatedQuota = new ResourceGroup.BytesAndMessagesCount();
        this.resourceGroupsMap.forEach((rgName, resourceGroup) -> {
            for (ResourceGroup.ResourceGroupMonitoringClass monClass : ResourceGroup.ResourceGroupMonitoringClass.values()) {
                try {
                    ResourceGroup.BytesAndMessagesCount globalUsageStats = resourceGroup.getGlobalUsageStats(monClass);
                    ResourceGroup.BytesAndMessagesCount localUsageStats = resourceGroup.getLocalUsageStats(monClass);
                    ResourceGroup.BytesAndMessagesCount confCounts = resourceGroup.getConfLimits(monClass);
                    long[] globUsageBytesArray = new long[String.valueOf(globalUsageStats.bytes).length()];
                    updatedQuota.bytes = this.quotaCalculator.computeLocalQuota(confCounts.bytes, localUsageStats.bytes, globUsageBytesArray);
                    long[] globUsageMessagesArray = new long[String.valueOf(globalUsageStats.messages).length()];
                    updatedQuota.messages = this.quotaCalculator.computeLocalQuota(confCounts.messages, localUsageStats.messages, globUsageMessagesArray);
                    resourceGroup.updateLocalQuota(monClass, updatedQuota);
                }
                catch (Throwable t) {
                    log.error("Got exception={} while calculating new quota for monitoring-class={} of RG={}", new Object[]{t.getMessage(), monClass, rgName});
                }
            }
        });
        long mSecsSinceEpochEnd = System.currentTimeMillis();
        long diffMSecs = mSecsSinceEpochEnd - mSecsSinceEpochStart;
        log.debug("calculateQuotaForAllResourceGroups took {} millisecs", (Object)diffMSecs);
        ServiceConfiguration config = this.pulsar.getConfiguration();
        long newPeriodInSeconds = config.getResourceUsageTransportPublishIntervalInSecs();
        if (newPeriodInSeconds != this.resourceUsagePublishPeriodInSeconds) {
            if (this.calculateQuotaPeriodicTask == null) {
                log.error("calculateQuotaForAllResourceGroups: Unable to find running task to cancel when publish period changed from {} to {} {}", new Object[]{this.resourceUsagePublishPeriodInSeconds, newPeriodInSeconds, this.timeUnitScale});
            } else {
                boolean cancelStatus = this.calculateQuotaPeriodicTask.cancel(true);
                log.info("Got status={} in cancel of periodic when publish period changed from {} to {} {}", new Object[]{cancelStatus, this.resourceUsagePublishPeriodInSeconds, newPeriodInSeconds, this.timeUnitScale});
            }
            this.calculateQuotaPeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(this::calculateQuotaForAllResourceGroups, newPeriodInSeconds, newPeriodInSeconds, this.timeUnitScale);
            this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds;
            maxIntervalForSuppressingReportsMSecs = this.resourceUsagePublishPeriodInSeconds * 5L;
        }
    }

    private void initialize() {
        long periodInSecs;
        ServiceConfiguration config = this.pulsar.getConfiguration();
        this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = (periodInSecs = (long)config.getResourceUsageTransportPublishIntervalInSecs());
        this.aggreagteLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(this::aggregateResourceGroupLocalUsages, periodInSecs, periodInSecs, this.timeUnitScale);
        this.calculateQuotaPeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(this::calculateQuotaForAllResourceGroups, periodInSecs, periodInSecs, this.timeUnitScale);
        maxIntervalForSuppressingReportsMSecs = this.resourceUsagePublishPeriodInSeconds * 5L;
    }

    private void checkRGCreateParams(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
        if (rgConfig == null) {
            throw new IllegalArgumentException("ResourceGroupCreate: Invalid null ResourceGroup config");
        }
        if (rgConfig.getName().isEmpty()) {
            throw new IllegalArgumentException("ResourceGroupCreate: can't create resource group with an empty name");
        }
        ResourceGroup rg = this.getResourceGroupInternal(rgConfig.getName());
        if (rg != null) {
            throw new PulsarAdminException("Resource group already exists:" + rgConfig.getName());
        }
    }

    protected static enum ResourceGroupOpStatus {
        OK,
        Exists,
        DoesNotExist,
        NotSupported;

    }
}

