/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BacklogQuotaManager {
    private static final Logger log = LoggerFactory.getLogger(BacklogQuotaManager.class);
    private final BacklogQuotaImpl defaultQuota;
    private final ZooKeeperDataCache<Policies> zkCache;
    private final PulsarService pulsar;
    private final boolean isTopicLevelPoliciesEnable;

    public BacklogQuotaManager(PulsarService pulsar) {
        this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
        this.defaultQuota = BacklogQuotaImpl.builder().limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 0x40000000L).limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond()).retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy()).build();
        this.zkCache = pulsar.getConfigurationCache().policiesCache();
        this.pulsar = pulsar;
    }

    public BacklogQuotaImpl getDefaultQuota() {
        return this.defaultQuota;
    }

    public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath, BacklogQuota.BacklogQuotaType backlogQuotaType) {
        try {
            return this.zkCache.get(policyPath).map(p -> (BacklogQuotaImpl)p.backlog_quota_map.getOrDefault((Object)backlogQuotaType, this.defaultQuota)).orElse(this.defaultQuota);
        }
        catch (Exception e) {
            log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", (Object)namespace, (Object)e);
            return this.defaultQuota;
        }
    }

    public BacklogQuotaImpl getBacklogQuota(TopicName topicName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
        String policyPath = AdminResource.path("policies", topicName.getNamespace());
        if (!this.isTopicLevelPoliciesEnable) {
            return this.getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType);
        }
        try {
            return Optional.ofNullable(this.pulsar.getTopicPoliciesService().getTopicPolicies(topicName)).map(TopicPolicies::getBackLogQuotaMap).map(map -> (BacklogQuotaImpl)map.get(backlogQuotaType.name())).orElseGet(() -> this.getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType));
        }
        catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
            log.debug("Topic policies cache have not init, will apply the namespace backlog quota: topicName={}", (Object)topicName);
        }
        catch (Exception e) {
            log.error("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}", (Object)topicName, (Object)e);
        }
        return this.getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType);
    }

    public long getBacklogQuotaLimitInSize(TopicName topicName) {
        return this.getBacklogQuota(topicName, BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
    }

    public int getBacklogQuotaLimitInTime(TopicName topicName) {
        return this.getBacklogQuota(topicName, BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
    }

    public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuota.BacklogQuotaType backlogQuotaType, boolean preciseTimeBasedBacklogQuotaCheck) {
        TopicName topicName = TopicName.get(persistentTopic.getName());
        BacklogQuotaImpl quota = this.getBacklogQuota(topicName, backlogQuotaType);
        log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", new Object[]{backlogQuotaType, persistentTopic.getName(), quota.getPolicy()});
        block0 : switch (quota.getPolicy()) {
            case consumer_backlog_eviction: {
                switch (backlogQuotaType) {
                    case destination_storage: {
                        this.dropBacklogForSizeLimit(persistentTopic, quota);
                        break block0;
                    }
                    case message_age: {
                        this.dropBacklogForTimeLimit(persistentTopic, quota, preciseTimeBasedBacklogQuotaCheck);
                        break block0;
                    }
                }
                break;
            }
            case producer_exception: 
            case producer_request_hold: {
                this.disconnectProducers(persistentTopic);
                break;
            }
        }
    }

    private void dropBacklogForSizeLimit(PersistentTopic persistentTopic, BacklogQuota quota) {
        double reductionFactor = 0.9;
        double targetSize = reductionFactor * (double)quota.getLimitSize();
        ManagedLedgerImpl mLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
        long backlogSize = mLedger.getEstimatedBacklogSize();
        if (log.isDebugEnabled()) {
            log.debug("[{}] target size is [{}] for quota limit [{}], backlog size is [{}]", new Object[]{persistentTopic.getName(), targetSize, targetSize / reductionFactor, backlogSize});
        }
        ManagedCursor previousSlowestConsumer = null;
        while ((double)backlogSize > targetSize) {
            ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
            if (slowestConsumer == null) {
                if (!log.isDebugEnabled()) break;
                log.debug("[{}] slowest consumer null.", (Object)persistentTopic.getName());
                break;
            }
            double messageSkipFactor = ((double)backlogSize - targetSize) / (double)backlogSize;
            if (slowestConsumer == previousSlowestConsumer) {
                log.info("[{}] Cursors not progressing, target size is [{}] for quota limit [{}], backlog size is [{}]", new Object[]{persistentTopic.getName(), targetSize, targetSize / reductionFactor, backlogSize});
                break;
            }
            long entriesInBacklog = slowestConsumer.getNumberOfEntriesInBacklog(false);
            int messagesToSkip = (int)(messageSkipFactor * (double)entriesInBacklog);
            try {
                if (messagesToSkip == 0) {
                    if (!log.isDebugEnabled()) break;
                    log.debug("no messages to skip for [{}]", (Object)slowestConsumer);
                    break;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Skipping [{}] messages on slowest consumer [{}] having backlog entries : [{}]", new Object[]{persistentTopic.getName(), messagesToSkip, slowestConsumer.getName(), entriesInBacklog});
                }
                slowestConsumer.skipEntries(messagesToSkip, ManagedCursor.IndividualDeletedEntries.Include);
            }
            catch (Exception e) {
                log.error("[{}] Error skipping [{}] messages from slowest consumer [{}]", new Object[]{persistentTopic.getName(), messagesToSkip, slowestConsumer.getName(), e});
            }
            backlogSize = mLedger.getEstimatedBacklogSize();
            previousSlowestConsumer = slowestConsumer;
            if (!log.isDebugEnabled()) continue;
            log.debug("[{}] Updated unconsumed size = [{}]. skipFactor: [{}]", new Object[]{persistentTopic.getName(), backlogSize, messageSkipFactor});
        }
    }

    private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuota quota, boolean preciseTimeBasedBacklogQuotaCheck) {
        if (preciseTimeBasedBacklogQuotaCheck) {
            double reductionFactor = 0.9;
            int target = (int)(reductionFactor * (double)quota.getLimitTime());
            if (log.isDebugEnabled()) {
                log.debug("[{}] target backlog expire time is [{}]", (Object)persistentTopic.getName(), (Object)target);
            }
            persistentTopic.getSubscriptions().forEach((__, subscription) -> subscription.getExpiryMonitor().expireMessages(target));
        } else {
            Long currentMillis = ((ManagedLedgerImpl)persistentTopic.getManagedLedger()).getClock().millis();
            ManagedLedgerImpl mLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
            try {
                PositionImpl nextPosition;
                ManagedCursor slowestConsumer;
                Position oldestPosition;
                MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo;
                while ((ledgerInfo = mLedger.getLedgerInfo((oldestPosition = (slowestConsumer = mLedger.getSlowestConsumer()).getMarkDeletedPosition()).getLedgerId()).get()).getTimestamp() > 0L && currentMillis - ledgerInfo.getTimestamp() > (long)quota.getLimitTime() && !(nextPosition = mLedger.getNextValidPosition(PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1L))).equals(oldestPosition)) {
                    slowestConsumer.resetCursor(nextPosition);
                }
            }
            catch (Exception e) {
                log.error("[{}] Error resetting cursor for slowest consumer [{}]", new Object[]{persistentTopic.getName(), mLedger.getSlowestConsumer().getName(), e});
            }
        }
    }

    private void disconnectProducers(PersistentTopic persistentTopic) {
        ArrayList futures = Lists.newArrayList();
        Map<String, Producer> producers = persistentTopic.getProducers();
        producers.values().forEach(producer -> {
            log.info("Producer [{}] has exceeded backlog quota on topic [{}]. Disconnecting producer", (Object)producer.getProducerName(), (Object)persistentTopic.getName());
            futures.add(producer.disconnect());
        });
        ((CompletableFuture)FutureUtil.waitForAll(futures).thenRun(() -> log.info("All producers on topic [{}] are disconnected", (Object)persistentTopic.getName()))).exceptionally(exception -> {
            log.error("Error in disconnecting producers on topic [{}] [{}]", (Object)persistentTopic.getName(), exception);
            return null;
        });
    }
}

