/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.BundlesQuotas;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.DistributedIdGenerator;
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.PulsarStats;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.validator.BindAddressValidator;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.common.collect.Queues;
import org.apache.pulsar.shade.io.netty.bootstrap.ServerBootstrap;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.AdaptiveRecvByteBufAllocator;
import org.apache.pulsar.shade.io.netty.channel.Channel;
import org.apache.pulsar.shade.io.netty.channel.ChannelInitializer;
import org.apache.pulsar.shade.io.netty.channel.ChannelOption;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.io.netty.channel.socket.SocketChannel;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslContext;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFuture;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.Futures;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.shade.org.apache.commons.collections.CollectionUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.BindAddress;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.shade.org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.shade.org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
import org.apache.pulsar.shade.org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.shade.org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.netty.NettyFutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerService
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(BrokerService.class);
    private static final Duration FUTURE_DEADLINE_TIMEOUT_DURATION = Duration.ofSeconds(60L);
    private static final TimeoutException FUTURE_DEADLINE_TIMEOUT_EXCEPTION = FutureUtil.createTimeoutException("Future didn't finish within deadline", BrokerService.class, "futureWithDeadline(...)");
    private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION = FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class, "futureWithDeadline(...)");
    private static final long GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS = 5000L;
    private static final double GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25;
    private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5;
    private final PulsarService pulsar;
    private final ManagedLedgerFactory managedLedgerFactory;
    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics;
    private final ConcurrentOpenHashMap<String, PulsarClient> replicationClients;
    private final ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> multiLayerTopicsMap;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<Integer>> owningTopics;
    private int numberOfNamespaceBundles = 0;
    private final EventLoopGroup acceptorGroup;
    private final EventLoopGroup workerGroup;
    private final OrderedExecutor topicOrderedExecutor;
    private final ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> offlineTopicStatCache;
    private static final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = BrokerService.prepareDynamicConfigurationMap();
    private final ConcurrentOpenHashMap<String, Consumer<?>> configRegisteredListeners;
    private final ConcurrentLinkedQueue<Pair<String, CompletableFuture<Optional<Topic>>>> pendingTopicLoadingQueue;
    private AuthorizationService authorizationService = null;
    private final ScheduledExecutorService statsUpdater;
    private final ScheduledExecutorService backlogQuotaChecker;
    protected final AtomicReference<Semaphore> lookupRequestSemaphore;
    protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
    private final ObserverGauge pendingLookupRequests;
    private final ObserverGauge pendingTopicLoadRequests;
    private final ScheduledExecutorService inactivityMonitor;
    private final ScheduledExecutorService messageExpiryMonitor;
    private final ScheduledExecutorService compactionMonitor;
    private final ScheduledExecutorService consumedLedgersMonitor;
    private ScheduledExecutorService topicPublishRateLimiterMonitor;
    private ScheduledExecutorService brokerPublishRateLimiterMonitor;
    private ScheduledExecutorService deduplicationSnapshotMonitor;
    protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
    private DistributedIdGenerator producerNameGenerator;
    public static final String PRODUCER_NAME_GENERATOR_PATH = "/counters/producer-name";
    private final BacklogQuotaManager backlogQuotaManager;
    private final int keepAliveIntervalSeconds;
    private final PulsarStats pulsarStats;
    private final AuthenticationService authenticationService;
    public static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers";
    private static final LongAdder totalUnackedMessages = new LongAdder();
    private final int maxUnackedMessages;
    public final int maxUnackedMsgsPerDispatcher;
    private static final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false);
    private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
    private final ServerBootstrap defaultServerBootstrap;
    private final BundlesQuotas bundlesQuotas;
    private PulsarChannelInitializer.Factory pulsarChannelInitFactory = PulsarChannelInitializer.DEFAULT_FACTORY;
    private final List<Channel> listenChannels = new ArrayList<Channel>(2);
    private Channel listenChannel;
    private Channel listenChannelTls;
    private boolean preciseTopicPublishRateLimitingEnable;
    private final LongAdder pausedConnections = new LongAdder();
    private BrokerInterceptor interceptor;
    private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;

    public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
        this.pulsar = pulsar;
        this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
        this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
        this.topics = ConcurrentOpenHashMap.newBuilder().build();
        this.replicationClients = ConcurrentOpenHashMap.newBuilder().build();
        this.clusterAdmins = ConcurrentOpenHashMap.newBuilder().build();
        this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();
        this.configRegisteredListeners = ConcurrentOpenHashMap.newBuilder().build();
        this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
        this.multiLayerTopicsMap = ConcurrentOpenHashMap.newBuilder().build();
        this.owningTopics = ConcurrentOpenHashMap.newBuilder().build();
        this.pulsarStats = new PulsarStats(pulsar);
        this.offlineTopicStatCache = ConcurrentOpenHashMap.newBuilder().build();
        this.topicOrderedExecutor = OrderedExecutor.newBuilder().numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic()).name("broker-topic-workers").build();
        DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor");
        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(pulsar.getConfiguration().getNumAcceptorThreads(), false, acceptorThreadFactory);
        this.workerGroup = eventLoopGroup;
        this.statsUpdater = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
        this.authorizationService = new AuthorizationService(pulsar.getConfiguration(), this.pulsar().getPulsarResources());
        pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
        pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
        this.inactivityMonitor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-inactivity-monitor"));
        this.messageExpiryMonitor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-msg-expiry-monitor"));
        this.compactionMonitor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-compaction-monitor"));
        this.consumedLedgersMonitor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
        this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
        this.backlogQuotaChecker = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
        this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
        this.blockedDispatchers = ConcurrentOpenHashSet.newBuilder().build();
        this.updateConfigurationAndRegisterListeners();
        this.lookupRequestSemaphore = new AtomicReference<Semaphore>(new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), false));
        this.topicLoadRequestSemaphore = new AtomicReference<Semaphore>(new Semaphore(pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(), false));
        if (pulsar.getConfiguration().getMaxUnackedMessagesPerBroker() > 0 && pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0.0) {
            this.maxUnackedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
            this.maxUnackedMsgsPerDispatcher = (int)((double)this.maxUnackedMessages * pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() / 100.0);
            log.info("Enabling per-broker unack-message limit {} and dispatcher-limit {} on blocked-broker", (Object)this.maxUnackedMessages, (Object)this.maxUnackedMsgsPerDispatcher);
            pulsar.getExecutor().scheduleAtFixedRate(SafeRun.safeRun(this::checkUnAckMessageDispatching), 600L, 30L, TimeUnit.SECONDS);
        } else {
            this.maxUnackedMessages = 0;
            this.maxUnackedMsgsPerDispatcher = 0;
            log.info("Disabling per broker unack-msg blocking due invalid unAckMsgSubscriptionPercentageLimitOnBrokerBlocked {} ", (Object)pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked());
        }
        this.delayedDeliveryTrackerFactory = DelayedDeliveryTrackerLoader.loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration());
        this.defaultServerBootstrap = this.defaultServerBootstrap();
        this.pendingLookupRequests = ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-").supplier(() -> pulsar.getConfig().getMaxConcurrentLookupRequest() - this.lookupRequestSemaphore.get().availablePermits()).register();
        this.pendingTopicLoadRequests = ObserverGauge.build("pulsar_broker_topic_load_pending_requests", "-").supplier(() -> pulsar.getConfig().getMaxConcurrentTopicLoadRequest() - this.topicLoadRequestSemaphore.get().availablePermits()).register();
        this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(pulsar.getConfiguration().getBrokerEntryMetadataInterceptors(), BrokerService.class.getClassLoader());
        this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore());
    }

    public void startProtocolHandlers(Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers) {
        protocolHandlers.forEach((protocol, initializers) -> initializers.forEach((address, initializer) -> {
            try {
                this.startProtocolHandler((String)protocol, (SocketAddress)address, (ChannelInitializer<SocketChannel>)initializer);
            }
            catch (IOException e) {
                log.error("{}", (Object)e.getMessage(), (Object)e.getCause());
                throw new RuntimeException(e.getMessage(), e.getCause());
            }
        }));
    }

    private void startProtocolHandler(String protocol, SocketAddress address, ChannelInitializer<SocketChannel> initializer) throws IOException {
        ServerBootstrap bootstrap = this.defaultServerBootstrap.clone();
        bootstrap.childHandler(initializer);
        try {
            bootstrap.bind(address).sync();
        }
        catch (Exception e) {
            throw new IOException("Failed to bind protocol `" + protocol + "` on " + address, e);
        }
        log.info("Successfully bind protocol `{}` on {}", (Object)protocol, (Object)address);
    }

    private ServerBootstrap defaultServerBootstrap() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        bootstrap.group(this.acceptorGroup, this.workerGroup);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 16384, 0x100000));
        bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(this.workerGroup));
        EventLoopUtil.enableTriggeredMode(bootstrap);
        return bootstrap;
    }

    public void start() throws Exception {
        this.producerNameGenerator = new DistributedIdGenerator(this.pulsar.getCoordinationService(), PRODUCER_NAME_GENERATOR_PATH, this.pulsar.getConfiguration().getClusterName());
        ServiceConfiguration serviceConfig = this.pulsar.getConfiguration();
        List<BindAddress> bindAddresses = BindAddressValidator.validateBindAddresses(serviceConfig, Arrays.asList("pulsar", "pulsar+ssl"));
        String internalListenerName = serviceConfig.getInternalListenerName();
        if (bindAddresses.size() == 0) {
            throw new IllegalArgumentException("At least one broker bind address must be configured");
        }
        for (BindAddress a : bindAddresses) {
            InetSocketAddress addr = new InetSocketAddress(a.getAddress().getHost(), a.getAddress().getPort());
            boolean isTls = "pulsar+ssl".equals(a.getAddress().getScheme());
            PulsarChannelInitializer.PulsarChannelOptions opts = PulsarChannelInitializer.PulsarChannelOptions.builder().enableTLS(isTls).listenerName(a.getListenerName()).build();
            ServerBootstrap b = this.defaultServerBootstrap.clone();
            b.childHandler(this.pulsarChannelInitFactory.newPulsarChannelInitializer(this.pulsar, opts));
            try {
                Channel ch = b.bind(addr).sync().channel();
                this.listenChannels.add(ch);
                if (StringUtils.isBlank(a.getListenerName()) || StringUtils.equalsIgnoreCase(a.getListenerName(), internalListenerName)) {
                    if (this.listenChannel == null && !isTls) {
                        this.listenChannel = ch;
                    }
                    if (this.listenChannelTls == null && isTls) {
                        this.listenChannelTls = ch;
                    }
                }
                log.info("Started Pulsar Broker service on {}, TLS: {}, listener: {}", new Object[]{ch.localAddress(), isTls ? SslContext.defaultServerProvider().toString() : "(none)", StringUtils.defaultString(a.getListenerName(), "(none)")});
            }
            catch (Exception e) {
                throw new IOException("Failed to bind Pulsar broker on " + addr, e);
            }
        }
        this.startStatsUpdater(serviceConfig.getStatsUpdateInitialDelayInSecs(), serviceConfig.getStatsUpdateFrequencyInSecs());
        this.startInactivityMonitor();
        this.startMessageExpiryMonitor();
        this.startCompactionMonitor();
        this.startConsumedLedgersMonitor();
        this.startBacklogQuotaChecker();
        this.updateBrokerPublisherThrottlingMaxRate();
        this.startCheckReplicationPolicies();
        this.startDeduplicationSnapshotMonitor();
    }

    protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) {
        this.statsUpdater.scheduleAtFixedRate(SafeRun.safeRun(this::updateRates), statsUpdateInitialDelayInSecs, statsUpdateFrequencyInSecs, TimeUnit.SECONDS);
        this.updateRates();
    }

    protected void startDeduplicationSnapshotMonitor() {
        int interval = this.pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
        if (interval > 0 && this.pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
            this.deduplicationSnapshotMonitor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("deduplication-snapshot-monitor"));
            this.deduplicationSnapshotMonitor.scheduleAtFixedRate(SafeRun.safeRun(() -> this.forEachTopic(Topic::checkDeduplicationSnapshot)), interval, interval, TimeUnit.SECONDS);
        }
    }

    protected void startInactivityMonitor() {
        if (this.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
            int interval = this.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
            this.inactivityMonitor.scheduleAtFixedRate(SafeRun.safeRun(() -> this.checkGC()), interval, interval, TimeUnit.SECONDS);
        }
        long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES.toSeconds(this.pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()) / 3L;
        this.inactivityMonitor.scheduleAtFixedRate(SafeRun.safeRun(this::checkMessageDeduplicationInfo), duplicationCheckerIntervalInSeconds, duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
        if (this.pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes() > 0) {
            long subscriptionExpiryCheckIntervalInSeconds = TimeUnit.MINUTES.toSeconds(this.pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes());
            this.inactivityMonitor.scheduleAtFixedRate(SafeRun.safeRun(this::checkInactiveSubscriptions), subscriptionExpiryCheckIntervalInSeconds, subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS);
        }
    }

    protected void startMessageExpiryMonitor() {
        int interval = this.pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes();
        this.messageExpiryMonitor.scheduleAtFixedRate(SafeRun.safeRun(this::checkMessageExpiry), interval, interval, TimeUnit.MINUTES);
    }

    protected void startCheckReplicationPolicies() {
        int interval = this.pulsar.getConfig().getReplicationPolicyCheckDurationSeconds();
        if (interval > 0) {
            this.messageExpiryMonitor.scheduleAtFixedRate(SafeRun.safeRun(this::checkReplicationPolicies), interval, interval, TimeUnit.SECONDS);
        }
    }

    protected void startCompactionMonitor() {
        int interval = this.pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
        if (interval > 0) {
            this.compactionMonitor.scheduleAtFixedRate(SafeRun.safeRun(() -> this.checkCompaction()), interval, interval, TimeUnit.SECONDS);
        }
    }

    protected void startConsumedLedgersMonitor() {
        int interval = this.pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
        if (interval > 0) {
            this.consumedLedgersMonitor.scheduleAtFixedRate(SafeRun.safeRun(this::checkConsumedLedgers), interval, interval, TimeUnit.SECONDS);
        }
    }

    protected void startBacklogQuotaChecker() {
        if (this.pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
            int interval = this.pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
            log.info("Scheduling a thread to check backlog quota after [{}] seconds in background", (Object)interval);
            this.backlogQuotaChecker.scheduleAtFixedRate(SafeRun.safeRun(this::monitorBacklogQuota), interval, interval, TimeUnit.SECONDS);
        } else {
            log.info("Backlog quota check monitoring is disabled");
        }
    }

    public synchronized void setupTopicPublishRateLimiterMonitor() {
        long topicTickTimeMs = this.pulsar().getConfiguration().getTopicPublisherThrottlingTickTimeMillis();
        if (topicTickTimeMs > 0L) {
            if (this.topicPublishRateLimiterMonitor == null) {
                this.topicPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-topic-publish-rate-limiter-monitor"));
                if (topicTickTimeMs > 0L) {
                    this.topicPublishRateLimiterMonitor.scheduleAtFixedRate(SafeRun.safeRun(() -> this.checkTopicPublishThrottlingRate()), topicTickTimeMs, topicTickTimeMs, TimeUnit.MILLISECONDS);
                    this.topicPublishRateLimiterMonitor.scheduleAtFixedRate(SafeRun.safeRun(() -> this.refreshTopicPublishRate()), 1L, 1L, TimeUnit.SECONDS);
                }
            }
        } else if (this.topicPublishRateLimiterMonitor != null) {
            try {
                this.topicPublishRateLimiterMonitor.awaitTermination(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                log.warn("failed to shutdown topicPublishRateLimiterMonitor", (Throwable)e);
            }
            this.refreshTopicPublishRate();
            this.topicPublishRateLimiterMonitor = null;
        }
    }

    public synchronized void setupBrokerPublishRateLimiterMonitor() {
        long brokerTickTimeMs = this.pulsar().getConfiguration().getBrokerPublisherThrottlingTickTimeMillis();
        if (brokerTickTimeMs > 0L) {
            if (this.brokerPublishRateLimiterMonitor == null) {
                this.brokerPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-broker-publish-rate-limiter-monitor"));
                if (brokerTickTimeMs > 0L) {
                    this.brokerPublishRateLimiterMonitor.scheduleAtFixedRate(SafeRun.safeRun(() -> this.checkBrokerPublishThrottlingRate()), brokerTickTimeMs, brokerTickTimeMs, TimeUnit.MILLISECONDS);
                    this.brokerPublishRateLimiterMonitor.scheduleAtFixedRate(SafeRun.safeRun(() -> this.refreshBrokerPublishRate()), 1L, 1L, TimeUnit.SECONDS);
                }
            }
        } else if (this.brokerPublishRateLimiterMonitor != null) {
            try {
                this.brokerPublishRateLimiterMonitor.awaitTermination(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                log.warn("failed to shutdown brokerPublishRateLimiterMonitor", (Throwable)e);
            }
            this.refreshBrokerPublishRate();
            this.brokerPublishRateLimiterMonitor = null;
        }
    }

    @Override
    public void close() throws IOException {
        try {
            this.closeAsync().get();
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            throw new PulsarServerException(e.getCause());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public CompletableFuture<Void> closeAndRemoveReplicationClient(String clusterName) {
        ArrayList futures = new ArrayList((int)this.topics.size());
        this.topics.forEach((__, future) -> {
            CompletableFuture f = new CompletableFuture();
            futures.add(f);
            future.whenComplete((ot, ex) -> {
                Replicator r;
                if (ot.isPresent() && (r = ((Topic)ot.get()).getReplicators().get(clusterName)) != null && r.isConnected()) {
                    r.disconnect(false).whenComplete((v, e) -> f.complete(null));
                    return;
                }
                f.complete(null);
            });
        });
        return FutureUtil.waitForAll(futures).thenCompose(__ -> {
            PulsarClient client = this.replicationClients.remove(clusterName);
            if (client == null) {
                return CompletableFuture.completedFuture(null);
            }
            return client.closeAsync();
        });
    }

    public CompletableFuture<Void> closeAsync() {
        try {
            log.info("Shutting down Pulsar Broker service");
            this.unloadNamespaceBundlesGracefully();
            this.replicationClients.forEach((cluster, client) -> {
                try {
                    client.shutdown();
                }
                catch (Exception e) {
                    log.warn("Error shutting down repl client for cluster {}", cluster, (Object)e);
                }
            });
            this.clusterAdmins.forEach((cluster, admin) -> {
                try {
                    admin.close();
                }
                catch (Exception e) {
                    log.warn("Error shutting down repl admin for cluster {}", cluster, (Object)e);
                }
            });
            CompletableFuture cancellableDownstreamFutureReference = new CompletableFuture();
            log.info("Event loops shutting down gracefully...");
            CompletionStage shutdownFuture = ((CompletableFuture)CompletableFuture.allOf(this.shutdownEventLoopGracefully(this.acceptorGroup), this.shutdownEventLoopGracefully(this.workerGroup)).handle((v, t) -> {
                if (t != null) {
                    log.warn("Error shutting down event loops gracefully", t);
                } else {
                    log.info("Event loops shutdown completed.");
                }
                return null;
            })).thenCompose(__ -> {
                log.info("Continuing to second phase in shutdown.");
                ArrayList<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<CompletableFuture<Void>>();
                this.listenChannels.forEach(ch -> {
                    if (ch.isOpen()) {
                        asyncCloseFutures.add(this.closeChannel((Channel)ch));
                    }
                });
                if (this.interceptor != null) {
                    this.interceptor.close();
                    this.interceptor = null;
                }
                try {
                    this.authenticationService.close();
                }
                catch (IOException e) {
                    log.warn("Error in closing authenticationService", (Throwable)e);
                }
                this.pulsarStats.close();
                try {
                    this.delayedDeliveryTrackerFactory.close();
                }
                catch (IOException e) {
                    log.warn("Error in closing delayedDeliveryTrackerFactory", (Throwable)e);
                }
                asyncCloseFutures.add(GracefulExecutorServicesShutdown.initiate().timeout(Duration.ofMillis((long)(0.5 * (double)this.pulsar.getConfiguration().getBrokerShutdownTimeoutMs()))).shutdown(this.statsUpdater, this.inactivityMonitor, this.messageExpiryMonitor, this.compactionMonitor, this.consumedLedgersMonitor, this.backlogQuotaChecker, this.topicOrderedExecutor, this.topicPublishRateLimiterMonitor, this.brokerPublishRateLimiterMonitor, this.deduplicationSnapshotMonitor).handle());
                CompletableFuture<Void> combined = FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures);
                cancellableDownstreamFutureReference.complete(combined);
                combined.handle((v, t) -> {
                    if (t == null) {
                        log.info("Broker service completely shut down");
                    } else if (t instanceof CancellationException) {
                        log.warn("Broker service didn't complete gracefully. Terminating Broker service.");
                    } else {
                        log.warn("Broker service shut down completed with exception", t);
                    }
                    return null;
                });
                return combined;
            });
            FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () -> cancellableDownstreamFutureReference.thenAccept(future -> future.cancel(false)));
            return shutdownFuture;
        }
        catch (Exception e) {
            return FutureUtil.failedFuture(e);
        }
    }

    CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup) {
        long brokerShutdownTimeoutMs = this.pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
        long quietPeriod = Math.min((long)(0.25 * (double)brokerShutdownTimeoutMs), 5000L);
        long timeout = (long)(0.5 * (double)brokerShutdownTimeoutMs);
        return NettyFutureUtil.toCompletableFutureVoid(eventLoopGroup.shutdownGracefully(quietPeriod, timeout, TimeUnit.MILLISECONDS));
    }

    private CompletableFuture<Void> closeChannel(Channel channel) {
        return ChannelFutures.toCompletableFuture(channel.close()).handle((c, t) -> {
            if (t != null && !(t instanceof RejectedExecutionException)) {
                log.warn("Cannot close channel {}", (Object)channel, t);
            }
            return null;
        });
    }

    public void unloadNamespaceBundlesGracefully() {
        try {
            log.info("Unloading namespace-bundles...");
            if (this.pulsar.getLoadManager() != null && this.pulsar.getLoadManager().get() != null) {
                try {
                    this.pulsar.getLoadManager().get().disableBroker();
                }
                catch (PulsarServerException.NotFoundException ne) {
                    log.warn("Broker load-manager znode doesn't exist ", (Throwable)ne);
                }
            }
            long closeTopicsStartTime = System.nanoTime();
            Set<NamespaceBundle> serviceUnits = this.pulsar.getNamespaceService().getOwnedServiceUnits();
            if (serviceUnits != null) {
                serviceUnits.forEach(su -> {
                    if (su instanceof NamespaceBundle) {
                        try {
                            this.pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle)su, 1L, TimeUnit.MINUTES).get();
                        }
                        catch (Exception e) {
                            log.warn("Failed to unload namespace bundle {}", su, (Object)e);
                        }
                    }
                });
            }
            double closeTopicsTimeSeconds = (double)TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - closeTopicsStartTime) / 1000.0;
            log.info("Unloading {} namespace-bundles completed in {} seconds", (Object)serviceUnits.size(), (Object)closeTopicsTimeSeconds);
        }
        catch (Exception e) {
            log.error("Failed to disable broker from loadbalancer list {}", (Object)e.getMessage(), (Object)e);
        }
    }

    public CompletableFuture<Optional<Topic>> getTopicIfExists(String topic) {
        return this.getTopic(topic, false);
    }

    public CompletableFuture<Topic> getOrCreateTopic(String topic) {
        return this.getTopic(topic, this.isAllowAutoTopicCreation(topic)).thenApply(Optional::get);
    }

    public CompletableFuture<Optional<Topic>> getTopic(String topic, boolean createIfMissing) {
        try {
            boolean isPersistentTopic;
            CompletableFuture<Optional<Topic>> topicFuture = this.topics.get(topic);
            if (topicFuture != null) {
                if (topicFuture.isCompletedExceptionally() || topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent()) {
                    this.topics.remove(topic, topicFuture);
                } else {
                    if (createIfMissing) {
                        if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) {
                            return topicFuture;
                        }
                        return topicFuture.thenCompose(value -> {
                            if (!value.isPresent()) {
                                return this.getTopic(topic, createIfMissing);
                            }
                            return CompletableFuture.completedFuture(value);
                        });
                    }
                    return topicFuture;
                }
            }
            if (isPersistentTopic = TopicName.get(topic).getDomain().equals((Object)TopicDomain.persistent)) {
                return this.topics.computeIfAbsent(topic, topicName -> this.loadOrCreatePersistentTopic((String)topicName, createIfMissing));
            }
            return this.topics.computeIfAbsent(topic, name -> {
                TopicName topicName = TopicName.get(name);
                if (topicName.isPartitioned()) {
                    TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
                    return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose(metadata -> {
                        if (topicName.getPartitionIndex() < metadata.partitions) {
                            return this.createNonPersistentTopic((String)name);
                        }
                        return CompletableFuture.completedFuture(Optional.empty());
                    });
                }
                if (createIfMissing) {
                    return this.createNonPersistentTopic((String)name);
                }
                return CompletableFuture.completedFuture(Optional.empty());
            });
        }
        catch (IllegalArgumentException e) {
            log.warn("[{}] Illegalargument exception when loading topic", (Object)topic, (Object)e);
            return FutureUtil.failedFuture(e);
        }
        catch (RuntimeException e) {
            Throwable cause = e.getCause();
            if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
                log.warn("[{}] Service unit is not ready when loading the topic", (Object)topic);
            } else {
                log.warn("[{}] Unexpected exception when loading topic: {}", new Object[]{topic, e.getMessage(), e});
            }
            return FutureUtil.failedFuture(cause);
        }
    }

    public CompletableFuture<SchemaVersion> deleteSchemaStorage(String topic) {
        Optional<Topic> optTopic = this.getTopicReference(topic);
        if (optTopic.isPresent()) {
            return optTopic.get().deleteSchema();
        }
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete) {
        return this.deleteTopic(topic, forceDelete, false);
    }

    public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete, boolean deleteSchema) {
        TopicName tn;
        Optional<Topic> optTopic = this.getTopicReference(topic);
        if (optTopic.isPresent()) {
            Topic t = optTopic.get();
            if (forceDelete) {
                if (deleteSchema) {
                    return t.deleteSchema().thenCompose(schemaVersion -> {
                        log.info("Successfully delete topic {}'s schema of version {}", (Object)t.getName(), schemaVersion);
                        return t.deleteForcefully();
                    });
                }
                return t.deleteForcefully();
            }
            if (t.isReplicated()) {
                List<String> clusters = t.getReplicators().keys();
                log.error("Delete forbidden topic {} is replicated on clusters {}", (Object)topic, clusters);
                return FutureUtil.failedFuture(new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters));
            }
            if (deleteSchema) {
                return t.deleteSchema().thenCompose(schemaVersion -> {
                    log.info("Successfully delete topic {}'s schema of version {}", (Object)t.getName(), schemaVersion);
                    return t.delete();
                });
            }
            return t.delete();
        }
        if (log.isDebugEnabled()) {
            log.debug("Topic {} is not loaded, try to delete from metadata", (Object)topic);
        }
        if (!(tn = TopicName.get(topic)).isPersistent()) {
            return CompletableFuture.completedFuture(null);
        }
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<Void>();
        this.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
        deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
            if (ex != null) {
                future.completeExceptionally((Throwable)ex);
                return;
            }
            this.managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new AsyncCallbacks.DeleteLedgerCallback(){

                @Override
                public void deleteLedgerComplete(Object ctx) {
                    future.complete(null);
                }

                @Override
                public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
                    future.completeExceptionally(exception);
                }
            }, null);
        });
        return future;
    }

    public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<Void> future, int count) {
        if (count == 0) {
            log.error("The number of retries has exhausted for topic {}", (Object)topic);
            future.completeExceptionally(new MetadataStoreException("The number of retries has exhausted"));
            return;
        }
        NamespaceName namespaceName = TopicName.get(topic).getNamespaceObject();
        ((CompletableFuture)this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(optPolicies -> {
            if (!optPolicies.isPresent() || !((Policies)optPolicies.get()).auth_policies.getTopicAuthentication().containsKey(topic)) {
                if (log.isDebugEnabled()) {
                    log.debug("Authentication policies not found for topic {}", (Object)topic);
                }
                future.complete(null);
                return;
            }
            ((CompletableFuture)this.pulsar.getPulsarResources().getNamespaceResources().setPoliciesAsync(TopicName.get(topic).getNamespaceObject(), p -> {
                p.auth_policies.getTopicAuthentication().remove(topic);
                return p;
            }).thenAccept(v -> {
                log.info("Successfully delete authentication policies for topic {}", (Object)topic);
                future.complete(null);
            })).exceptionally(ex1 -> {
                if (ex1.getCause() instanceof MetadataStoreException.BadVersionException) {
                    log.warn("Failed to delete authentication policies because of bad version. Retry to delete authentication policies for topic {}", (Object)topic);
                    this.deleteTopicAuthenticationWithRetry(topic, future, count - 1);
                } else {
                    log.error("Failed to delete authentication policies for topic {}", (Object)topic, ex1);
                    future.completeExceptionally((Throwable)ex1);
                }
                return null;
            });
        })).exceptionally(ex -> {
            log.error("Failed to get policies for topic {}", (Object)topic, ex);
            future.completeExceptionally((Throwable)ex);
            return null;
        });
    }

    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
        CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<Optional<Topic>>();
        if (!this.pulsar.getConfiguration().isEnableNonPersistentTopics()) {
            if (log.isDebugEnabled()) {
                log.debug("Broker is unable to load non-persistent topic {}", (Object)topic);
            }
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Broker is not unable to load non-persistent topic"));
        }
        long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
        CompletableFuture<Void> isOwner = this.checkTopicNsOwnership(topic);
        ((CompletableFuture)isOwner.thenRun(() -> ((CompletableFuture)((CompletableFuture)nonPersistentTopic.initialize().thenCompose(__ -> nonPersistentTopic.checkReplication())).thenRun(() -> {
            log.info("Created topic {}", (Object)nonPersistentTopic);
            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
            this.pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
            this.addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
            topicFuture.complete(Optional.of(nonPersistentTopic));
        })).exceptionally(ex -> {
            log.warn("Replication check failed. Removing topic from topics list {}, {}", (Object)topic, (Object)ex.getCause());
            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
                this.pulsar.getExecutor().execute(() -> this.topics.remove(topic, topicFuture));
                topicFuture.completeExceptionally((Throwable)ex);
            });
            return null;
        }))).exceptionally(e -> {
            log.warn("CheckTopicNsOwnership fail when createNonPersistentTopic! {}", (Object)topic, (Object)e.getCause());
            topicFuture.complete(Optional.of(nonPersistentTopic));
            this.pulsar.getExecutor().execute(() -> this.topics.remove(topic, topicFuture));
            return null;
        });
        return topicFuture;
    }

    private <T> CompletableFuture<T> futureWithDeadline() {
        return FutureUtil.createFutureWithTimeout(FUTURE_DEADLINE_TIMEOUT_DURATION, this.executor(), () -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION);
    }

    public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> clusterDataOp) {
        PulsarClient client = this.replicationClients.get(cluster);
        if (client != null) {
            return client;
        }
        return this.replicationClients.computeIfAbsent(cluster, key -> {
            try {
                String serviceUrlTls;
                ClusterData data = (ClusterData)clusterDataOp.orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));
                ClientBuilder clientBuilder = PulsarClient.builder().enableTcpNoDelay(false).connectionsPerBroker(this.pulsar.getConfiguration().getReplicationConnectionsPerBroker()).statsInterval(0L, TimeUnit.SECONDS);
                clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(this.pulsar.getConfiguration().getProperties(), "brokerClient_"));
                if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) {
                    clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters());
                } else if (this.pulsar.getConfiguration().isAuthenticationEnabled()) {
                    clientBuilder.authentication(this.pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), this.pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                }
                String string = serviceUrlTls = StringUtils.isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls() : data.getServiceUrlTls();
                if (data.isBrokerClientTlsEnabled()) {
                    this.configTlsSettings(clientBuilder, serviceUrlTls, data.isBrokerClientTlsEnabledWithKeyStore(), data.isTlsAllowInsecureConnection(), data.getBrokerClientTlsTrustStoreType(), data.getBrokerClientTlsTrustStore(), data.getBrokerClientTlsTrustStorePassword(), data.getBrokerClientTrustCertsFilePath());
                } else if (this.pulsar.getConfiguration().isBrokerClientTlsEnabled()) {
                    this.configTlsSettings(clientBuilder, serviceUrlTls, this.pulsar.getConfiguration().isBrokerClientTlsEnabledWithKeyStore(), this.pulsar.getConfiguration().isTlsAllowInsecureConnection(), this.pulsar.getConfiguration().getBrokerClientTlsTrustStoreType(), this.pulsar.getConfiguration().getBrokerClientTlsTrustStore(), this.pulsar.getConfiguration().getBrokerClientTlsTrustStorePassword(), this.pulsar.getConfiguration().getBrokerClientTrustCertsFilePath());
                } else {
                    clientBuilder.serviceUrl(StringUtils.isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
                }
                if (data.getProxyProtocol() != null && StringUtils.isNotBlank(data.getProxyServiceUrl())) {
                    clientBuilder.proxyServiceUrl(data.getProxyServiceUrl(), data.getProxyProtocol());
                    log.info("Configuring proxy-url {} with protocol {}", (Object)data.getProxyServiceUrl(), (Object)data.getProxyProtocol());
                }
                if (StringUtils.isNotBlank(data.getListenerName())) {
                    clientBuilder.listenerName(data.getListenerName());
                    log.info("Configuring listenerName {}", (Object)data.getListenerName());
                }
                ClientConfigurationData conf = ((ClientBuilderImpl)clientBuilder).getClientConfigurationData();
                return new PulsarClientImpl(conf, this.workerGroup);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void configTlsSettings(ClientBuilder clientBuilder, String serviceUrl, boolean brokerClientTlsEnabledWithKeyStore, boolean isTlsAllowInsecureConnection, String brokerClientTlsTrustStoreType, String brokerClientTlsTrustStore, String brokerClientTlsTrustStorePassword, String brokerClientTrustCertsFilePath) {
        clientBuilder.serviceUrl(serviceUrl).enableTls(true).allowTlsInsecureConnection(isTlsAllowInsecureConnection);
        if (brokerClientTlsEnabledWithKeyStore) {
            clientBuilder.useKeyStoreTls(true).tlsTrustStoreType(brokerClientTlsTrustStoreType).tlsTrustStorePath(brokerClientTlsTrustStore).tlsTrustStorePassword(brokerClientTlsTrustStorePassword);
        } else {
            clientBuilder.tlsTrustCertsFilePath(brokerClientTrustCertsFilePath);
        }
    }

    public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> clusterDataOp) {
        PulsarAdmin admin = this.clusterAdmins.get(cluster);
        if (admin != null) {
            return admin;
        }
        return this.clusterAdmins.computeIfAbsent(cluster, key -> {
            try {
                ClusterData data = (ClusterData)clusterDataOp.orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));
                ServiceConfiguration conf = this.pulsar.getConfig();
                boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && StringUtils.isNotBlank(data.getServiceUrlTls());
                String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : data.getServiceUrl();
                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
                builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_"));
                builder.authentication(conf.getBrokerClientAuthenticationPlugin(), conf.getBrokerClientAuthenticationParameters());
                if (isTlsUrl) {
                    builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
                    if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
                        builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType()).tlsTrustStorePath(conf.getBrokerClientTlsTrustStore()).tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword());
                    } else {
                        builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
                    }
                }
                builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
                PulsarAdmin adminClient = builder.build();
                log.info("created admin with url {} ", (Object)adminApiUrl);
                return adminClient;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(String topic, boolean createIfMissing) throws RuntimeException {
        CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout(Duration.ofSeconds(this.pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), this.executor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
        if (!this.pulsar.getConfiguration().isEnablePersistentTopics()) {
            if (log.isDebugEnabled()) {
                log.debug("Broker is unable to load persistent topic {}", (Object)topic);
            }
            topicFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Broker is not unable to load persistent topic"));
            return topicFuture;
        }
        ((CompletableFuture)this.checkTopicNsOwnership(topic).thenRun(() -> {
            Semaphore topicLoadSemaphore = this.topicLoadRequestSemaphore.get();
            if (topicLoadSemaphore.tryAcquire()) {
                this.checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture);
                topicFuture.handle((persistentTopic, ex) -> {
                    topicLoadSemaphore.release();
                    this.createPendingLoadTopic();
                    return null;
                });
            } else {
                this.pendingTopicLoadingQueue.add(new ImmutablePair<String, CompletableFuture>(topic, topicFuture));
                if (log.isDebugEnabled()) {
                    log.debug("topic-loading for {} added into pending queue", (Object)topic);
                }
            }
        })).exceptionally(ex -> {
            topicFuture.completeExceptionally(ex.getCause());
            return null;
        });
        return topicFuture;
    }

    private void checkOwnershipAndCreatePersistentTopic(String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
        TopicName topicName = TopicName.get(topic);
        ((CompletableFuture)this.pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName).thenAccept(isActive -> {
            if (isActive.booleanValue()) {
                this.createPersistentTopic(topic, createIfMissing, topicFuture);
            } else {
                String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
                log.warn(msg);
                this.pulsar.getExecutor().execute(() -> this.topics.remove(topic, topicFuture));
                topicFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
            }
        })).exceptionally(ex -> {
            topicFuture.completeExceptionally((Throwable)ex);
            return null;
        });
    }

    private void createPersistentTopic(final String topic, final boolean createIfMissing, final CompletableFuture<Optional<Topic>> topicFuture) {
        final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        final TopicName topicName = TopicName.get(topic);
        if (PulsarService.isTransactionSystemTopic(topicName)) {
            String msg = String.format("Can not create transaction system topic %s", topic);
            log.warn(msg);
            this.pulsar.getExecutor().execute(() -> this.topics.remove(topic, topicFuture));
            topicFuture.completeExceptionally(new BrokerServiceException.NotAllowedException(msg));
            return;
        }
        CompletableFuture<Object> maxTopicsCheck = createIfMissing ? this.checkMaxTopicsPerNamespace(topicName, 1) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)((CompletableFuture)maxTopicsCheck.thenCompose(__ -> this.getManagedLedgerConfig(topicName))).thenAccept(managedLedgerConfig -> {
            if (this.isBrokerEntryMetadataEnabled()) {
                HashSet<BrokerEntryMetadataInterceptor> interceptors = new HashSet<BrokerEntryMetadataInterceptor>();
                for (BrokerEntryMetadataInterceptor interceptor : this.brokerEntryMetadataInterceptors) {
                    if (interceptor instanceof AppendIndexMetadataInterceptor) {
                        interceptors.add(new AppendIndexMetadataInterceptor());
                        continue;
                    }
                    interceptors.add(interceptor);
                }
                managedLedgerConfig.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl(interceptors));
            }
            managedLedgerConfig.setCreateIfMissing(createIfMissing);
            this.managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), (ManagedLedgerConfig)managedLedgerConfig, new AsyncCallbacks.OpenLedgerCallback(){

                @Override
                public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                    try {
                        PersistentTopic persistentTopic = BrokerService.this.isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) : new PersistentTopic(topic, ledger, BrokerService.this);
                        CompletableFuture<Void> preCreateSubForCompaction = persistentTopic.preCreateSubscriptionForCompactionIfNeeded();
                        CompletionStage replicationFuture = persistentTopic.initialize().thenCompose(__ -> persistentTopic.checkReplication());
                        ((CompletableFuture)((CompletableFuture)CompletableFuture.allOf(new CompletableFuture[]{preCreateSubForCompaction, replicationFuture}).thenCompose(v -> persistentTopic.checkDeduplicationStatus())).thenRun(() -> {
                            log.info("Created topic {} - dedup is {}", (Object)topic, (Object)(persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled"));
                            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
                            BrokerService.this.pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
                            if (topicFuture.isCompletedExceptionally()) {
                                log.warn("{} future is already completed with failure {}, closing the topic", (Object)topic, FutureUtil.getException(topicFuture));
                                persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> BrokerService.this.topics.remove(topic, topicFuture), (Executor)BrokerService.this.executor());
                            } else {
                                BrokerService.this.addTopicToStatsMaps(topicName, persistentTopic);
                                topicFuture.complete(Optional.of(persistentTopic));
                            }
                        })).exceptionally(ex -> {
                            log.warn("Replication or dedup check failed. Removing topic from topics list {}, {}", (Object)topic, ex);
                            persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
                                BrokerService.this.topics.remove(topic, topicFuture);
                                topicFuture.completeExceptionally((Throwable)ex);
                            }, (Executor)BrokerService.this.executor());
                            return null;
                        });
                    }
                    catch (PulsarServerException e) {
                        log.warn("Failed to create topic {}-{}", (Object)topic, (Object)e.getMessage());
                        BrokerService.this.pulsar.getExecutor().execute(() -> BrokerService.this.topics.remove(topic, topicFuture));
                        topicFuture.completeExceptionally(e);
                    }
                }

                @Override
                public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
                    if (!createIfMissing && exception instanceof ManagedLedgerException.ManagedLedgerNotFoundException) {
                        topicFuture.complete(Optional.empty());
                    } else {
                        log.warn("Failed to create topic {}", (Object)topic, (Object)exception);
                        BrokerService.this.pulsar.getExecutor().execute(() -> BrokerService.this.topics.remove(topic, topicFuture));
                        topicFuture.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
                    }
                }
            }, () -> this.isTopicNsOwnedByBroker(topicName), null);
        })).exceptionally(exception -> {
            log.warn("[{}] Failed to get topic configuration: {}", new Object[]{topic, exception.getMessage(), exception});
            this.pulsar.getExecutor().execute(() -> this.topics.remove(topic, topicFuture));
            topicFuture.completeExceptionally((Throwable)exception);
            return null;
        });
    }

    public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName topicName) {
        NamespaceName namespace = topicName.getNamespaceObject();
        ServiceConfiguration serviceConfig = this.pulsar.getConfiguration();
        NamespaceResources nsr = this.pulsar.getPulsarResources().getNamespaceResources();
        LocalPoliciesResources lpr = this.pulsar.getPulsarResources().getLocalPolicies();
        return nsr.getPoliciesAsync(namespace).thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies, localPolicies) -> {
            PersistencePolicies persistencePolicies = null;
            RetentionPolicies retentionPolicies = null;
            OffloadPoliciesImpl topicLevelOffloadPolicies = null;
            if (this.pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
                try {
                    TopicPolicies topicPolicies = this.pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
                    if (topicPolicies != null) {
                        persistencePolicies = topicPolicies.getPersistence();
                        retentionPolicies = topicPolicies.getRetentionPolicies();
                        topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
                    }
                }
                catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
                    log.debug("Topic {} policies have not been initialized yet.", (Object)topicName);
                }
            }
            if (persistencePolicies == null) {
                persistencePolicies = policies.map(p -> p.persistence).orElseGet(() -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), serviceConfig.getManagedLedgerDefaultWriteQuorum(), serviceConfig.getManagedLedgerDefaultAckQuorum(), serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
            }
            if (retentionPolicies == null) {
                retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), serviceConfig.getDefaultRetentionSizeInMB()));
            }
            ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
            managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
            managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
            managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
            if (localPolicies.isPresent() && ((LocalPolicies)localPolicies.get()).bookieAffinityGroup != null) {
                managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
                HashMap<String, Object> properties = Maps.newHashMap();
                properties.put("isolationBookieGroups", ((LocalPolicies)localPolicies.get()).bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
                properties.put("secondaryIsolationBookieGroups", ((LocalPolicies)localPolicies.get()).bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
                managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
            }
            managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
            managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
            managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
            managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
            managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
            managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
            managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
            managedLedgerConfig.setMetadataOperationsTimeoutSeconds(serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
            managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
            managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
            managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
            managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
            managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
            managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
            managedLedgerConfig.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
            managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
            managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
            managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
            managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
            OffloadPoliciesImpl nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
            OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(topicLevelOffloadPolicies, OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), this.getPulsar().getConfig().getProperties());
            if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
                managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
            } else if (topicLevelOffloadPolicies != null) {
                try {
                    LedgerOffloader topicLevelLedgerOffLoader = this.pulsar().createManagedLedgerOffloader(offloadPolicies);
                    managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
                }
                catch (PulsarServerException e) {
                    throw new RuntimeException(e);
                }
            } else {
                managedLedgerConfig.setLedgerOffloader(this.pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
            }
            managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
            managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
            return managedLedgerConfig;
        });
    }

    private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
        ((CompletableFuture)this.pulsar.getNamespaceService().getBundleAsync(topicName).thenAccept(namespaceBundle -> {
            if (namespaceBundle != null) {
                ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> concurrentOpenHashMap = this.multiLayerTopicsMap;
                synchronized (concurrentOpenHashMap) {
                    String serviceUnit = namespaceBundle.toString();
                    this.multiLayerTopicsMap.computeIfAbsent(topicName.getNamespace(), k -> ConcurrentOpenHashMap.newBuilder().build()).computeIfAbsent(serviceUnit, k -> ConcurrentOpenHashMap.newBuilder().build()).put(topicName.toString(), topic);
                }
            }
            this.invalidateOfflineTopicStatCache(topicName);
        })).exceptionally(ex -> {
            log.warn("Got exception when retrieving bundle name during create persistent topic", ex);
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) {
        block5: {
            Preconditions.checkNotNull(oldBundle);
            try {
                List<Topic> topics = this.getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(), oldBundle.toString());
                if (CollectionUtils.isEmpty(topics)) break block5;
                topics.stream().forEach(t -> this.addTopicToStatsMaps(TopicName.get(t.getName()), (Topic)t));
                ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> concurrentOpenHashMap = this.multiLayerTopicsMap;
                synchronized (concurrentOpenHashMap) {
                    this.multiLayerTopicsMap.get(oldBundle.getNamespaceObject().toString()).remove(oldBundle.toString());
                    this.pulsarStats.invalidBundleStats(oldBundle.toString());
                }
            }
            catch (Exception e) {
                log.warn("Got exception while refreshing topicStats map", (Throwable)e);
            }
        }
    }

    public PersistentOfflineTopicStats getOfflineTopicStat(TopicName topicName) {
        return this.offlineTopicStatCache.get(topicName);
    }

    public void cacheOfflineTopicStats(TopicName topicName, PersistentOfflineTopicStats offlineTopicStats) {
        this.offlineTopicStatCache.put(topicName, offlineTopicStats);
    }

    public void invalidateOfflineTopicStatCache(TopicName topicName) {
        PersistentOfflineTopicStats removed = this.offlineTopicStatCache.remove(topicName);
        if (removed != null) {
            log.info("Removed cached offline topic stat for {} ", (Object)topicName.getPersistenceNamingEncoding());
        }
    }

    public Optional<Topic> getTopicReference(String topic) {
        CompletableFuture<Optional<Topic>> future = this.topics.get(topic);
        if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
            return future.join();
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateRates() {
        PulsarStats pulsarStats = this.pulsarStats;
        synchronized (pulsarStats) {
            this.pulsarStats.updateStats(this.multiLayerTopicsMap);
            Summary.rotateLatencyCollection();
        }
    }

    public void getDimensionMetrics(Consumer<ByteBuf> consumer) {
        this.pulsarStats.getDimensionMetrics(consumer);
    }

    public List<Metrics> getTopicMetrics() {
        return this.pulsarStats.getTopicMetrics();
    }

    public Map<String, NamespaceBundleStats> getBundleStats() {
        return this.pulsarStats.getBundleStats();
    }

    public Semaphore getLookupRequestSemaphore() {
        return this.lookupRequestSemaphore.get();
    }

    public void checkGC() {
        this.forEachTopic(Topic::checkGC);
    }

    public void checkMessageExpiry() {
        this.forEachTopic(Topic::checkMessageExpiry);
    }

    public void checkReplicationPolicies() {
        this.forEachTopic(Topic::checkReplication);
    }

    public void checkCompaction() {
        this.forEachTopic(t -> {
            if (t instanceof PersistentTopic) {
                ((PersistentTopic)t).checkCompaction();
            }
        });
    }

    private void checkConsumedLedgers() {
        this.forEachTopic(t -> {
            if (t instanceof PersistentTopic) {
                Optional.ofNullable(((PersistentTopic)t).getManagedLedger()).ifPresent(managedLedger -> managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE));
            }
        });
    }

    public void checkMessageDeduplicationInfo() {
        this.forEachTopic(Topic::checkMessageDeduplicationInfo);
    }

    public void checkInactiveSubscriptions() {
        this.forEachTopic(Topic::checkInactiveSubscriptions);
    }

    public void checkTopicPublishThrottlingRate() {
        this.forEachTopic(Topic::checkTopicPublishThrottlingRate);
    }

    private void refreshTopicPublishRate() {
        this.forEachTopic(Topic::resetTopicPublishCountAndEnableReadIfRequired);
    }

    public void checkBrokerPublishThrottlingRate() {
        this.brokerPublishRateLimiter.checkPublishRate();
        if (this.brokerPublishRateLimiter.isPublishRateExceeded()) {
            this.forEachTopic(topic -> ((AbstractTopic)topic).disableProducerRead());
        }
    }

    private void refreshBrokerPublishRate() {
        boolean doneReset = this.brokerPublishRateLimiter.resetPublishCount();
        this.forEachTopic(topic -> topic.resetBrokerPublishCountAndEnableReadIfRequired(doneReset));
    }

    public void forEachTopic(Consumer<Topic> consumer) {
        this.topics.forEach((n, t) -> {
            Optional<Topic> topic = BrokerService.extractTopic(t);
            topic.ifPresent(consumer::accept);
        });
    }

    public BacklogQuotaManager getBacklogQuotaManager() {
        return this.backlogQuotaManager;
    }

    public void monitorBacklogQuota() {
        this.forEachTopic(topic -> {
            if (topic instanceof PersistentTopic) {
                PersistentTopic persistentTopic = (PersistentTopic)topic;
                if (persistentTopic.isSizeBacklogExceeded()) {
                    this.getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic, BacklogQuota.BacklogQuotaType.destination_storage, false);
                } else if (persistentTopic.isTimeBacklogExceeded()) {
                    this.getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic, BacklogQuota.BacklogQuotaType.message_age, this.pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
                } else if (log.isDebugEnabled()) {
                    log.debug("quota not exceeded for [{}]", (Object)topic.getName());
                }
            }
        });
    }

    public boolean isTopicNsOwnedByBroker(TopicName topicName) {
        try {
            return this.pulsar.getNamespaceService().isServiceUnitOwned(topicName);
        }
        catch (Exception e) {
            log.warn("Failed to check the ownership of the topic: {}, {}", (Object)topicName, (Object)e.getMessage());
            return false;
        }
    }

    public CompletableFuture<Void> checkTopicNsOwnership(String topic) {
        TopicName topicName = TopicName.get(topic);
        return this.pulsar.getNamespaceService().checkTopicOwnership(topicName).thenCompose(ownedByThisInstance -> {
            if (ownedByThisInstance.booleanValue()) {
                return CompletableFuture.completedFuture(null);
            }
            String msg = String.format("Namespace bundle for topic (%s) not served by this instance. Please redo the lookup. Request is denied: namespace=%s", topic, topicName.getNamespace());
            log.warn(msg);
            return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException(msg));
        });
    }

    public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, boolean closeWithoutWaitingClientDisconnect, long timeout, TimeUnit unit) {
        CompletableFuture<Integer> future = this.unloadServiceUnit(serviceUnit, closeWithoutWaitingClientDisconnect);
        ScheduledFuture<?> taskTimeout = this.executor().schedule(() -> {
            if (!future.isDone()) {
                log.warn("Unloading of {} has timed out", (Object)serviceUnit);
                future.complete(0);
            }
        }, timeout, unit);
        future.whenComplete((r, ex) -> taskTimeout.cancel(true));
        return future;
    }

    private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, boolean closeWithoutWaitingClientDisconnect) {
        ArrayList closeFutures = Lists.newArrayList();
        this.topics.forEach((name, topicFuture) -> {
            TopicName topicName = TopicName.get(name);
            if (serviceUnit.includes(topicName)) {
                log.info("[{}] Unloading topic", (Object)topicName);
                closeFutures.add(topicFuture.thenCompose(t -> t.isPresent() ? ((Topic)t.get()).close(closeWithoutWaitingClientDisconnect) : CompletableFuture.completedFuture(null)));
            }
        });
        if (this.getPulsar().getConfig().isTransactionCoordinatorEnabled() && serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
            TransactionMetadataStoreService metadataStoreService = this.getPulsar().getTransactionMetadataStoreService();
            this.getPulsar().getTransactionMetadataStoreService().getStores().values().stream().filter(store -> serviceUnit.includes(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int)store.getTransactionCoordinatorID().getId()))).map(TransactionMetadataStore::getTransactionCoordinatorID).forEach(tcId -> closeFutures.add(metadataStoreService.removeTransactionMetadataStore((TransactionCoordinatorID)tcId)));
        }
        return FutureUtil.waitForAll(closeFutures).thenApply(v -> closeFutures.size());
    }

    public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) {
        for (String topic : this.topics.keys()) {
            TopicName topicName = TopicName.get(topic);
            if (!serviceUnit.includes(topicName) || !this.getTopicReference(topic).isPresent()) continue;
            log.info("[{}][{}] Clean unloaded topic from cache.", (Object)serviceUnit.toString(), (Object)topic);
            this.pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit);
        }
    }

    public AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    public CompletableFuture<Void> removeTopicFromCache(String topic) {
        TopicName topicName = TopicName.get(topic);
        return this.pulsar.getNamespaceService().getBundleAsync(topicName).thenAccept(namespaceBundle -> this.removeTopicFromCache(topic, (NamespaceBundle)namespaceBundle));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) {
        String bundleName = namespaceBundle.toString();
        String namespaceName = TopicName.get(topic).getNamespaceObject().toString();
        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> concurrentOpenHashMap = this.multiLayerTopicsMap;
        synchronized (concurrentOpenHashMap) {
            ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> namespaceMap = this.multiLayerTopicsMap.get(namespaceName);
            if (namespaceMap != null) {
                ConcurrentOpenHashMap<String, Topic> bundleMap = namespaceMap.get(bundleName);
                if (bundleMap != null) {
                    bundleMap.remove(topic);
                    if (bundleMap.isEmpty()) {
                        namespaceMap.remove(bundleName);
                    }
                }
                if (namespaceMap.isEmpty()) {
                    this.multiLayerTopicsMap.remove(namespaceName);
                    ClusterReplicationMetrics clusterReplicationMetrics = this.pulsarStats.getClusterReplicationMetrics();
                    this.replicationClients.forEach((cluster, client) -> clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName, (String)cluster)));
                }
            }
        }
        this.topics.remove(topic);
        try {
            Compactor compactor = this.pulsar.getCompactor(false);
            if (compactor != null) {
                compactor.getStats().removeTopic(topic);
            }
        }
        catch (PulsarServerException pulsarServerException) {
            // empty catch block
        }
    }

    public int getNumberOfNamespaceBundles() {
        this.numberOfNamespaceBundles = 0;
        this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> {
            this.numberOfNamespaceBundles = (int)((long)this.numberOfNamespaceBundles + bundles.size());
        });
        return this.numberOfNamespaceBundles;
    }

    public ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> getTopics() {
        return this.topics;
    }

    private void handleMetadataChanges(Notification n) {
        if (n.getType() == NotificationType.Modified && NamespaceResources.pathIsFromNamespace(n.getPath())) {
            NamespaceName ns = NamespaceResources.namespaceFromPath(n.getPath());
            this.handlePoliciesUpdates(ns);
        } else if (this.pulsar().getPulsarResources().getDynamicConfigResources().isDynamicConfigurationPath(n.getPath())) {
            this.handleDynamicConfigurationUpdates();
        }
    }

    private void handlePoliciesUpdates(NamespaceName namespace) {
        this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace).thenAccept(optPolicies -> {
            if (!optPolicies.isPresent()) {
                return;
            }
            Policies policies = (Policies)optPolicies.get();
            log.info("[{}] updating with {}", (Object)namespace, (Object)policies);
            this.topics.forEach((name, topicFuture) -> {
                if (namespace.includes(TopicName.get(name))) {
                    topicFuture.thenAccept(topic -> {
                        if (log.isDebugEnabled()) {
                            log.debug("Notifying topic that policies have changed: {}", name);
                        }
                        topic.ifPresent(t -> t.onPoliciesUpdate(policies));
                    });
                }
            });
            this.unloadDeletedReplNamespace(policies, namespace);
        });
    }

    private void handleDynamicConfigurationUpdates() {
        this.pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfigurationAsync().thenAccept(optMap -> {
            if (!optMap.isPresent()) {
                return;
            }
            Map data = (Map)optMap.get();
            data.forEach((configKey, value) -> {
                Field configField = BrokerService.dynamicConfigurationMap.get((String)configKey).field;
                Object newValue = FieldParser.value((String)data.get(configKey), configField);
                if (configField != null) {
                    Consumer<?> listener = this.configRegisteredListeners.get((String)configKey);
                    try {
                        Object existingValue = configField.get(this.pulsar.getConfiguration());
                        configField.set(this.pulsar.getConfiguration(), newValue);
                        log.info("Successfully updated configuration {}/{}", configKey, data.get(configKey));
                        if (listener != null && !existingValue.equals(newValue)) {
                            listener.accept(newValue);
                        }
                    }
                    catch (Exception e) {
                        log.error("Failed to update config {}/{}", configKey, newValue);
                    }
                } else {
                    log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
                }
            });
        });
    }

    private void unloadDeletedReplNamespace(Policies data, NamespaceName namespace) {
        if (!namespace.isGlobal()) {
            return;
        }
        String localCluster = this.pulsar.getConfiguration().getClusterName();
        if (!data.replication_clusters.contains(localCluster)) {
            this.pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesAsync(namespace).thenAccept(bundles -> bundles.getBundles().forEach(bundle -> this.pulsar.getNamespaceService().isNamespaceBundleOwned((NamespaceBundle)bundle).thenAccept(isExist -> {
                if (isExist.booleanValue()) {
                    this.pulsar().getExecutor().submit(() -> {
                        try {
                            this.pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(), bundle.getBundleRange());
                        }
                        catch (Exception e) {
                            log.error("Failed to unload namespace-bundle {}-{} that not owned by {}, {}", new Object[]{namespace.toString(), bundle.toString(), localCluster, e.getMessage()});
                        }
                    });
                }
            })));
        }
    }

    public PulsarService pulsar() {
        return this.pulsar;
    }

    public EventLoopGroup executor() {
        return this.workerGroup;
    }

    public ConcurrentOpenHashMap<String, PulsarClient> getReplicationClients() {
        return this.replicationClients;
    }

    public boolean isAuthenticationEnabled() {
        return this.pulsar.getConfiguration().isAuthenticationEnabled();
    }

    public boolean isAuthorizationEnabled() {
        return this.pulsar.getConfiguration().isAuthorizationEnabled();
    }

    public int getKeepAliveIntervalSeconds() {
        return this.keepAliveIntervalSeconds;
    }

    public String generateUniqueProducerName() {
        return this.producerNameGenerator.getNextId();
    }

    public Map<String, TopicStatsImpl> getTopicStats() {
        HashMap<String, TopicStatsImpl> stats = new HashMap<String, TopicStatsImpl>();
        this.forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, false)));
        return stats;
    }

    public AuthenticationService getAuthenticationService() {
        return this.authenticationService;
    }

    public List<Topic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> map1 = this.multiLayerTopicsMap.get(namespace);
        if (map1 == null) {
            return Collections.emptyList();
        }
        ConcurrentOpenHashMap<String, Topic> map2 = map1.get(bundle);
        if (map2 == null) {
            return Collections.emptyList();
        }
        return map2.values();
    }

    private void updateConfigurationAndRegisterListeners() {
        this.addDynamicConfigValidator("loadManagerClassName", className -> {
            try {
                Class.forName(className);
            }
            catch (ClassNotFoundException | NoClassDefFoundError e) {
                log.warn("Configured load-manager class {} not found {}", className, (Object)e.getMessage());
                return false;
            }
            return true;
        });
        this.updateDynamicServiceConfiguration();
        this.registerConfigurationListener("maxConcurrentLookupRequest", maxConcurrentLookupRequest -> this.lookupRequestSemaphore.set(new Semaphore((Integer)maxConcurrentLookupRequest, false)));
        this.registerConfigurationListener("maxConcurrentTopicLoadRequest", maxConcurrentTopicLoadRequest -> this.topicLoadRequestSemaphore.set(new Semaphore((Integer)maxConcurrentTopicLoadRequest, false)));
        this.registerConfigurationListener("loadManagerClassName", className -> this.pulsar.getExecutor().execute(() -> {
            try {
                LoadManager newLoadManager = LoadManager.create(this.pulsar);
                log.info("Created load manager: {}", className);
                this.pulsar.getLoadManager().get().stop();
                newLoadManager.start();
                this.pulsar.getLoadManager().set(newLoadManager);
            }
            catch (Exception ex) {
                log.warn("Failed to change load manager", (Throwable)ex);
            }
        }));
        this.registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", dispatchRatePerTopicInMsg -> this.updateTopicMessageDispatchRate());
        this.registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", dispatchRatePerTopicInByte -> this.updateTopicMessageDispatchRate());
        this.registerConfigurationListener("autoSkipNonRecoverableData", skipNonRecoverableLedger -> this.updateManagedLedgerConfig());
        this.registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInMsg", dispatchRatePerTopicInMsg -> this.updateSubscriptionMessageDispatchRate());
        this.registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", dispatchRatePerTopicInByte -> this.updateSubscriptionMessageDispatchRate());
        this.registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg", dispatchRatePerTopicInMsg -> this.updateReplicatorMessageDispatchRate());
        this.registerConfigurationListener("dispatchThrottlingRatePerReplicatorInByte", dispatchRatePerTopicInByte -> this.updateReplicatorMessageDispatchRate());
        this.registerConfigurationListener("brokerPublisherThrottlingTickTimeMillis", publisherThrottlingTickTimeMillis -> this.setupBrokerPublishRateLimiterMonitor());
        this.registerConfigurationListener("brokerPublisherThrottlingMaxMessageRate", brokerPublisherThrottlingMaxMessageRate -> this.updateBrokerPublisherThrottlingMaxRate());
        this.registerConfigurationListener("brokerPublisherThrottlingMaxByteRate", brokerPublisherThrottlingMaxByteRate -> this.updateBrokerPublisherThrottlingMaxRate());
        if (!this.preciseTopicPublishRateLimitingEnable) {
            this.registerConfigurationListener("topicPublisherThrottlingTickTimeMillis", publisherThrottlingTickTimeMillis -> this.setupTopicPublishRateLimiterMonitor());
        }
    }

    private void updateBrokerPublisherThrottlingMaxRate() {
        int currentMaxMessageRate = this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate();
        long currentMaxByteRate = this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate();
        int brokerTickMs = this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis();
        if (brokerTickMs <= 0 || currentMaxByteRate <= 0L && currentMaxMessageRate <= 0) {
            if (this.brokerPublishRateLimiter != PublishRateLimiter.DISABLED_RATE_LIMITER) {
                this.refreshBrokerPublishRate();
                this.brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
            }
            return;
        }
        PublishRate publishRate = new PublishRate(currentMaxMessageRate, currentMaxByteRate);
        log.info("Update broker publish rate limiting {}", (Object)publishRate);
        this.setupBrokerPublishRateLimiterMonitor();
        if (this.brokerPublishRateLimiter == null || this.brokerPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
            this.brokerPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
        } else {
            this.brokerPublishRateLimiter.update(publishRate);
        }
    }

    private void updateTopicMessageDispatchRate() {
        this.pulsar().getExecutor().execute(() -> this.forEachTopic(topic -> {
            if (topic.getDispatchRateLimiter().isPresent()) {
                topic.getDispatchRateLimiter().get().updateDispatchRate();
            }
        }));
    }

    private void updateSubscriptionMessageDispatchRate() {
        this.pulsar().getExecutor().submit(() -> this.forEachTopic(topic -> topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
            Dispatcher dispatcher = persistentSubscription.getDispatcher();
            if (dispatcher != null) {
                dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate);
            }
        })));
    }

    private void updateReplicatorMessageDispatchRate() {
        this.pulsar().getExecutor().submit(() -> this.forEachTopic(topic -> topic.getReplicators().forEach((name, persistentReplicator) -> {
            if (persistentReplicator.getRateLimiter().isPresent()) {
                persistentReplicator.getRateLimiter().get().updateDispatchRate();
            }
        })));
    }

    private void updateManagedLedgerConfig() {
        this.pulsar().getExecutor().execute(() -> this.forEachTopic(topic -> {
            try {
                if (topic instanceof PersistentTopic) {
                    PersistentTopic persistentTopic = (PersistentTopic)topic;
                    persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(this.pulsar.getConfiguration().isAutoSkipNonRecoverableData());
                }
            }
            catch (Exception e) {
                log.warn("[{}] failed to update managed-ledger config", (Object)topic.getName(), (Object)e);
            }
        }));
    }

    public <T> void registerConfigurationListener(String configKey, Consumer<T> listener) {
        this.validateConfigKey(configKey);
        this.configRegisteredListeners.put(configKey, listener);
    }

    private void addDynamicConfigValidator(String key, Predicate<String> validator) {
        this.validateConfigKey(key);
        if (dynamicConfigurationMap.containsKey(key)) {
            BrokerService.dynamicConfigurationMap.get((String)key).validator = validator;
        }
    }

    private void validateConfigKey(String key) {
        try {
            ServiceConfiguration.class.getDeclaredField(key);
        }
        catch (Exception e) {
            log.error("ServiceConfiguration key {} not found {}", (Object)key, (Object)e.getMessage());
            throw new IllegalArgumentException("Invalid service config " + key, e);
        }
    }

    private void updateDynamicServiceConfiguration() {
        Optional<Object> configCache = Optional.empty();
        try {
            configCache = this.pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfiguration();
            if (!configCache.isPresent()) {
                this.pulsar().getPulsarResources().getDynamicConfigResources().setDynamicConfigurationWithCreate(n -> Maps.newHashMap());
            }
        }
        catch (Exception e) {
            log.warn("Failed to read dynamic broker configuration", (Throwable)e);
        }
        configCache.ifPresent(stringStringMap -> stringStringMap.forEach((key, value) -> {
            if (dynamicConfigurationMap.containsKey((String)key) && BrokerService.dynamicConfigurationMap.get((String)key).validator != null && !BrokerService.dynamicConfigurationMap.get((String)key).validator.test((String)value)) {
                log.error("Failed to validate dynamic config {} with value {}", key, value);
                throw new IllegalArgumentException(String.format("Failed to validate dynamic-config %s/%s", key, value));
            }
            try {
                Field field = ServiceConfiguration.class.getDeclaredField((String)key);
                if (field != null && field.isAnnotationPresent(FieldContext.class)) {
                    field.setAccessible(true);
                    field.set(this.pulsar().getConfiguration(), FieldParser.value(value, field));
                    log.info("Successfully updated {}/{}", key, value);
                }
            }
            catch (Exception e) {
                log.warn("Failed to update service configuration {}/{}, {}", new Object[]{key, value, e.getMessage()});
            }
        }));
    }

    public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() {
        return this.delayedDeliveryTrackerFactory;
    }

    public static List<String> getDynamicConfiguration() {
        return dynamicConfigurationMap.keys();
    }

    public Map<String, String> getRuntimeConfiguration() {
        HashMap<String, String> configMap = Maps.newHashMap();
        ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = this.getRuntimeConfigurationMap();
        runtimeConfigurationMap.forEach((key, value) -> configMap.put((String)key, String.valueOf(value)));
        return configMap;
    }

    public static boolean isDynamicConfiguration(String key) {
        return dynamicConfigurationMap.containsKey(key);
    }

    public static boolean validateDynamicConfiguration(String key, String value) {
        if (dynamicConfigurationMap.containsKey(key) && BrokerService.dynamicConfigurationMap.get((String)key).validator != null) {
            return BrokerService.dynamicConfigurationMap.get((String)key).validator.test(value);
        }
        return true;
    }

    private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
        ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = ConcurrentOpenHashMap.newBuilder().build();
        for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
            if (field == null || !field.isAnnotationPresent(FieldContext.class)) continue;
            field.setAccessible(true);
            if (!field.getAnnotation(FieldContext.class).dynamic()) continue;
            dynamicConfigurationMap.put(field.getName(), new ConfigField(field));
        }
        return dynamicConfigurationMap;
    }

    private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() {
        ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = ConcurrentOpenHashMap.newBuilder().build();
        for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
            if (field == null || !field.isAnnotationPresent(FieldContext.class)) continue;
            field.setAccessible(true);
            try {
                Object configValue = field.get(this.pulsar.getConfiguration());
                runtimeConfigurationMap.put(field.getName(), configValue == null ? "" : configValue);
            }
            catch (Exception e) {
                log.error("Failed to get value of field {}, {}", (Object)field.getName(), (Object)e.getMessage());
            }
        }
        return runtimeConfigurationMap;
    }

    private void createPendingLoadTopic() {
        Pair<String, CompletableFuture<Optional<Topic>>> pendingTopic = this.pendingTopicLoadingQueue.poll();
        if (pendingTopic == null) {
            return;
        }
        String topic = pendingTopic.getLeft();
        ((CompletableFuture)this.checkTopicNsOwnership(topic).thenRun(() -> {
            CompletableFuture pendingFuture = (CompletableFuture)pendingTopic.getRight();
            Semaphore topicLoadSemaphore = this.topicLoadRequestSemaphore.get();
            boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
            this.checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture);
            pendingFuture.handle((persistentTopic, ex) -> {
                if (acquiredPermit) {
                    topicLoadSemaphore.release();
                }
                this.createPendingLoadTopic();
                return null;
            });
        })).exceptionally(e -> {
            log.error("Failed to create pending topic {}", (Object)topic, e);
            ((CompletableFuture)pendingTopic.getRight()).completeExceptionally(e instanceof RuntimeException && e.getCause() != null ? e.getCause() : e);
            this.inactivityMonitor.schedule(this::createPendingLoadTopic, 100L, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(TopicName topicName) {
        if (this.pulsar.getNamespaceService() == null) {
            return FutureUtil.failedFuture(new BrokerServiceException.NamingException("namespace service is not ready"));
        }
        return this.pulsar.getNamespaceService().checkTopicExists(topicName).thenCompose(topicExists -> this.fetchPartitionedTopicMetadataAsync(topicName).thenCompose(metadata -> {
            CompletableFuture future = new CompletableFuture();
            this.pulsar.getExecutor().execute(() -> {
                if (metadata.partitions == 0 && !topicExists.booleanValue() && !topicName.isPartitioned() && this.pulsar.getBrokerService().isAllowAutoTopicCreation(topicName) && this.pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
                    ((CompletableFuture)this.pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName).thenAccept(md -> future.complete(md))).exceptionally(ex -> {
                        if (ex.getCause() instanceof MetadataStoreException.AlreadyExistsException) {
                            this.fetchPartitionedTopicMetadataAsync(topicName).whenComplete((metadata2, ex2) -> {
                                if (ex2 == null) {
                                    future.complete(metadata2);
                                } else {
                                    future.completeExceptionally((Throwable)ex2);
                                }
                            });
                        } else {
                            future.completeExceptionally((Throwable)ex);
                        }
                        return null;
                    });
                } else {
                    future.complete(metadata);
                }
            });
            return future;
        }));
    }

    private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
        int defaultNumPartitions = this.pulsar.getBrokerService().getDefaultNumPartitions(topicName);
        int maxPartitions = this.pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
        Preconditions.checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
        Preconditions.checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions, "Number of partitions should be less than or equal to " + maxPartitions);
        PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
        return this.checkMaxTopicsPerNamespace(topicName, defaultNumPartitions).thenCompose(__ -> {
            NamespaceResources.PartitionedTopicResources partitionResources = this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
            return partitionResources.createPartitionedTopicAsync(topicName, configMetadata).thenApply(v -> {
                log.info("partitioned metadata successfully created for {}", (Object)topicName);
                return configMetadata;
            });
        });
    }

    public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) {
        return this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName).thenApply(metadata -> metadata.orElseGet(() -> new PartitionedTopicMetadata()));
    }

    public OrderedExecutor getTopicOrderedExecutor() {
        return this.topicOrderedExecutor;
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> getMultiLayerTopicMap() {
        return this.multiLayerTopicsMap;
    }

    public void addUnAckedMessages(PersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) {
        if (this.maxUnackedMessages > 0) {
            totalUnackedMessages.add(numberOfMessages);
            if (blockedDispatcherOnHighUnackedMsgs.get() && !dispatcher.isBlockedDispatcherOnUnackedMsgs() && dispatcher.getTotalUnackedMessages() > this.maxUnackedMsgsPerDispatcher) {
                this.lock.readLock().lock();
                try {
                    log.info("[{}] dispatcher reached to max unack msg limit on blocked-broker {}", (Object)dispatcher.getName(), (Object)dispatcher.getTotalUnackedMessages());
                    dispatcher.blockDispatcherOnUnackedMsgs();
                    this.blockedDispatchers.add(dispatcher);
                }
                finally {
                    this.lock.readLock().unlock();
                }
            }
        }
    }

    public void checkUnAckMessageDispatching() {
        if (this.maxUnackedMessages <= 0) {
            return;
        }
        long unAckedMessages = totalUnackedMessages.sum();
        if (unAckedMessages >= (long)this.maxUnackedMessages && blockedDispatcherOnHighUnackedMsgs.compareAndSet(false, true)) {
            log.info("Starting blocking dispatchers with unacked msgs {} due to reached max broker limit {}", (Object)this.maxUnackedMessages, (Object)this.maxUnackedMsgsPerDispatcher);
            this.executor().execute(() -> this.blockDispatchersWithLargeUnAckMessages());
        } else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < (long)(this.maxUnackedMessages / 2) && blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) {
            this.unblockDispatchersOnUnAckMessages(this.blockedDispatchers.values());
        }
    }

    public boolean isBrokerDispatchingBlocked() {
        return blockedDispatcherOnHighUnackedMsgs.get();
    }

    private void blockDispatchersWithLargeUnAckMessages() {
        this.lock.readLock().lock();
        try {
            this.forEachTopic(topic -> topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
                PersistentDispatcherMultipleConsumers dispatcher;
                int dispatcherUnAckMsgs;
                if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers && (dispatcherUnAckMsgs = (dispatcher = (PersistentDispatcherMultipleConsumers)persistentSubscription.getDispatcher()).getTotalUnackedMessages()) > this.maxUnackedMsgsPerDispatcher) {
                    log.info("[{}] Blocking dispatcher due to reached max broker limit {}", (Object)dispatcher.getName(), (Object)dispatcher.getTotalUnackedMessages());
                    dispatcher.blockDispatcherOnUnackedMsgs();
                    this.blockedDispatchers.add(dispatcher);
                }
            }));
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
        this.lock.writeLock().lock();
        try {
            dispatcherList.forEach(dispatcher -> {
                dispatcher.unBlockDispatcherOnUnackedMsgs();
                this.executor().execute(() -> dispatcher.readMoreEntries());
                log.info("[{}] Dispatcher is unblocked", (Object)dispatcher.getName());
                this.blockedDispatchers.remove((PersistentDispatcherMultipleConsumers)dispatcher);
            });
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public static Optional<Topic> extractTopic(CompletableFuture<Optional<Topic>> topicFuture) {
        if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) {
            return topicFuture.join();
        }
        return Optional.empty();
    }

    public Optional<Integer> getListenPort() {
        if (this.listenChannel != null) {
            return Optional.of(((InetSocketAddress)this.listenChannel.localAddress()).getPort());
        }
        return Optional.empty();
    }

    public Optional<Integer> getListenPortTls() {
        if (this.listenChannelTls != null) {
            return Optional.of(((InetSocketAddress)this.listenChannelTls.localAddress()).getPort());
        }
        return Optional.empty();
    }

    private void foreachCnx(Consumer<TransportCnx> consumer) {
        HashSet cnxSet = new HashSet();
        this.topics.forEach((n, t) -> {
            Optional<Topic> topic = BrokerService.extractTopic(t);
            topic.ifPresent(value -> value.getProducers().values().forEach(producer -> cnxSet.add(producer.getCnx())));
        });
        cnxSet.forEach(consumer);
    }

    public boolean isAllowAutoTopicCreation(String topic) {
        TopicName topicName = TopicName.get(topic);
        return this.isAllowAutoTopicCreation(topicName);
    }

    public boolean isAllowAutoTopicCreation(TopicName topicName) {
        if (this.pulsar.getConfiguration().isSystemTopicEnabled() && this.isSystemTopic(topicName)) {
            return true;
        }
        AutoTopicCreationOverride autoTopicCreationOverride = this.getAutoTopicCreationOverride(topicName);
        if (autoTopicCreationOverride != null) {
            return autoTopicCreationOverride.isAllowAutoTopicCreation();
        }
        return this.pulsar.getConfiguration().isAllowAutoTopicCreation();
    }

    public boolean isDefaultTopicTypePartitioned(TopicName topicName) {
        AutoTopicCreationOverride autoTopicCreationOverride = this.getAutoTopicCreationOverride(topicName);
        if (autoTopicCreationOverride != null) {
            return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.getTopicType());
        }
        return this.pulsar.getConfiguration().isDefaultTopicTypePartitioned();
    }

    public int getDefaultNumPartitions(TopicName topicName) {
        AutoTopicCreationOverride autoTopicCreationOverride = this.getAutoTopicCreationOverride(topicName);
        if (autoTopicCreationOverride != null) {
            return autoTopicCreationOverride.getDefaultNumPartitions();
        }
        return this.pulsar.getConfiguration().getDefaultNumPartitions();
    }

    private AutoTopicCreationOverride getAutoTopicCreationOverride(TopicName topicName) {
        Optional<Policies> policies = this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
        if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
            return policies.get().autoTopicCreationOverride;
        }
        log.debug("No autoTopicCreateOverride policy found for {}", (Object)topicName);
        return null;
    }

    public boolean isAllowAutoSubscriptionCreation(String topic) {
        TopicName topicName = TopicName.get(topic);
        return this.isAllowAutoSubscriptionCreation(topicName);
    }

    public boolean isAllowAutoSubscriptionCreation(TopicName topicName) {
        AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = this.getAutoSubscriptionCreationOverride(topicName);
        if (autoSubscriptionCreationOverride != null) {
            return autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation();
        }
        return this.pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
    }

    private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(TopicName topicName) {
        Optional<Policies> policies = this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
        if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
            return policies.get().autoSubscriptionCreationOverride;
        }
        log.debug("No autoSubscriptionCreateOverride policy found for {}", (Object)topicName);
        return null;
    }

    public boolean isSystemTopic(String topic) {
        return this.isSystemTopic(TopicName.get(topic));
    }

    public boolean isSystemTopic(TopicName topicName) {
        if (topicName.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE) || topicName.getNamespaceObject().equals(this.pulsar.getHeartbeatNamespaceV1()) || topicName.getNamespaceObject().equals(this.pulsar.getHeartbeatNamespaceV2())) {
            return true;
        }
        TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
        if (EventsTopicNames.checkTopicIsEventsNames(nonePartitionedTopicName)) {
            return true;
        }
        String localName = nonePartitionedTopicName.getLocalName();
        return StringUtils.endsWith(localName, "__transaction_pending_ack");
    }

    public Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
        if (!this.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
            return Optional.empty();
        }
        try {
            return Optional.ofNullable(this.pulsar.getTopicPoliciesService().getTopicPolicies(topicName));
        }
        catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
            log.debug("Topic {} policies have not been initialized yet.", (Object)topicName.getPartitionedTopicName());
            return Optional.empty();
        }
    }

    public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
        if (!this.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
            return CompletableFuture.completedFuture(null);
        }
        TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
        return this.pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
    }

    private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
        return this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()).thenCompose(optPolicies -> {
            int maxTopicsPerNamespace = optPolicies.map(p -> p.max_topics_per_namespace).orElse(this.pulsar.getConfig().getMaxTopicsPerNamespace());
            if (maxTopicsPerNamespace > 0 && !this.isSystemTopic(topicName)) {
                return this.pulsar().getPulsarResources().getTopicResources().getExistingPartitions(topicName).thenCompose(topics -> {
                    long topicsCount = topics.stream().filter(t -> !this.isSystemTopic(TopicName.get(t))).count();
                    if (topicsCount + (long)numPartitions > (long)maxTopicsPerNamespace) {
                        log.error("Failed to create persistent topic {}, exceed maximum number of topics in namespace", (Object)topicName);
                        return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, "Exceed maximum number of topics in namespace."));
                    }
                    return CompletableFuture.completedFuture(null);
                });
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    public void setInterceptor(BrokerInterceptor interceptor) {
        this.interceptor = interceptor;
    }

    public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
        return this.brokerEntryMetadataInterceptors;
    }

    public boolean isBrokerEntryMetadataEnabled() {
        return !this.brokerEntryMetadataInterceptors.isEmpty();
    }

    public void pausedConnections(int numberOfConnections) {
        this.pausedConnections.add(numberOfConnections);
    }

    public void resumedConnections(int numberOfConnections) {
        this.pausedConnections.add(-numberOfConnections);
    }

    public long getPausedConnections() {
        return this.pausedConnections.longValue();
    }

    @VisibleForTesting
    public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
        this.pulsarChannelInitFactory = factory;
    }

    public PulsarService getPulsar() {
        return this.pulsar;
    }

    public ManagedLedgerFactory getManagedLedgerFactory() {
        return this.managedLedgerFactory;
    }

    public ConcurrentOpenHashMap<String, PulsarAdmin> getClusterAdmins() {
        return this.clusterAdmins;
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> getMultiLayerTopicsMap() {
        return this.multiLayerTopicsMap;
    }

    public EventLoopGroup getAcceptorGroup() {
        return this.acceptorGroup;
    }

    public EventLoopGroup getWorkerGroup() {
        return this.workerGroup;
    }

    public ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> getOfflineTopicStatCache() {
        return this.offlineTopicStatCache;
    }

    public ConcurrentOpenHashMap<String, Consumer<?>> getConfigRegisteredListeners() {
        return this.configRegisteredListeners;
    }

    public ConcurrentLinkedQueue<Pair<String, CompletableFuture<Optional<Topic>>>> getPendingTopicLoadingQueue() {
        return this.pendingTopicLoadingQueue;
    }

    public ScheduledExecutorService getStatsUpdater() {
        return this.statsUpdater;
    }

    public AtomicReference<Semaphore> getTopicLoadRequestSemaphore() {
        return this.topicLoadRequestSemaphore;
    }

    public ObserverGauge getPendingLookupRequests() {
        return this.pendingLookupRequests;
    }

    public ObserverGauge getPendingTopicLoadRequests() {
        return this.pendingTopicLoadRequests;
    }

    public ScheduledExecutorService getInactivityMonitor() {
        return this.inactivityMonitor;
    }

    public ScheduledExecutorService getMessageExpiryMonitor() {
        return this.messageExpiryMonitor;
    }

    public ScheduledExecutorService getCompactionMonitor() {
        return this.compactionMonitor;
    }

    public ScheduledExecutorService getConsumedLedgersMonitor() {
        return this.consumedLedgersMonitor;
    }

    public ScheduledExecutorService getTopicPublishRateLimiterMonitor() {
        return this.topicPublishRateLimiterMonitor;
    }

    public ScheduledExecutorService getBrokerPublishRateLimiterMonitor() {
        return this.brokerPublishRateLimiterMonitor;
    }

    public ScheduledExecutorService getDeduplicationSnapshotMonitor() {
        return this.deduplicationSnapshotMonitor;
    }

    public PublishRateLimiter getBrokerPublishRateLimiter() {
        return this.brokerPublishRateLimiter;
    }

    public DistributedIdGenerator getProducerNameGenerator() {
        return this.producerNameGenerator;
    }

    public PulsarStats getPulsarStats() {
        return this.pulsarStats;
    }

    public int getMaxUnackedMessages() {
        return this.maxUnackedMessages;
    }

    public int getMaxUnackedMsgsPerDispatcher() {
        return this.maxUnackedMsgsPerDispatcher;
    }

    public ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> getBlockedDispatchers() {
        return this.blockedDispatchers;
    }

    public ReadWriteLock getLock() {
        return this.lock;
    }

    public ServerBootstrap getDefaultServerBootstrap() {
        return this.defaultServerBootstrap;
    }

    public PulsarChannelInitializer.Factory getPulsarChannelInitFactory() {
        return this.pulsarChannelInitFactory;
    }

    public List<Channel> getListenChannels() {
        return this.listenChannels;
    }

    public Channel getListenChannel() {
        return this.listenChannel;
    }

    public Channel getListenChannelTls() {
        return this.listenChannelTls;
    }

    public boolean isPreciseTopicPublishRateLimitingEnable() {
        return this.preciseTopicPublishRateLimitingEnable;
    }

    public BrokerInterceptor getInterceptor() {
        return this.interceptor;
    }

    protected void setNumberOfNamespaceBundles(int numberOfNamespaceBundles) {
        this.numberOfNamespaceBundles = numberOfNamespaceBundles;
    }

    protected void setAuthorizationService(AuthorizationService authorizationService) {
        this.authorizationService = authorizationService;
    }

    protected void setTopicPublishRateLimiterMonitor(ScheduledExecutorService topicPublishRateLimiterMonitor) {
        this.topicPublishRateLimiterMonitor = topicPublishRateLimiterMonitor;
    }

    protected void setBrokerPublishRateLimiterMonitor(ScheduledExecutorService brokerPublishRateLimiterMonitor) {
        this.brokerPublishRateLimiterMonitor = brokerPublishRateLimiterMonitor;
    }

    protected void setDeduplicationSnapshotMonitor(ScheduledExecutorService deduplicationSnapshotMonitor) {
        this.deduplicationSnapshotMonitor = deduplicationSnapshotMonitor;
    }

    protected void setBrokerPublishRateLimiter(PublishRateLimiter brokerPublishRateLimiter) {
        this.brokerPublishRateLimiter = brokerPublishRateLimiter;
    }

    protected void setProducerNameGenerator(DistributedIdGenerator producerNameGenerator) {
        this.producerNameGenerator = producerNameGenerator;
    }

    protected void setPulsarChannelInitFactory(PulsarChannelInitializer.Factory pulsarChannelInitFactory) {
        this.pulsarChannelInitFactory = pulsarChannelInitFactory;
    }

    protected void setListenChannel(Channel listenChannel) {
        this.listenChannel = listenChannel;
    }

    protected void setListenChannelTls(Channel listenChannelTls) {
        this.listenChannelTls = listenChannelTls;
    }

    protected void setPreciseTopicPublishRateLimitingEnable(boolean preciseTopicPublishRateLimitingEnable) {
        this.preciseTopicPublishRateLimitingEnable = preciseTopicPublishRateLimitingEnable;
    }

    protected void setBrokerEntryMetadataInterceptors(Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors) {
        this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<Integer>> getOwningTopics() {
        return this.owningTopics;
    }

    public ScheduledExecutorService getBacklogQuotaChecker() {
        return this.backlogQuotaChecker;
    }

    public BundlesQuotas getBundlesQuotas() {
        return this.bundlesQuotas;
    }

    private static class ConfigField {
        final Field field;
        Predicate<String> validator;

        public ConfigField(Field field) {
            this.field = field;
        }
    }
}

