/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.nonpersistent;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.shade.com.carrotsearch.hppc.ObjectObjectHashMap;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.EntryCacheManager;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonPersistentTopic
extends AbstractTopic
implements Topic {
    private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
    private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
    private static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
    private volatile long entriesAddedCounter = 0L;
    private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
    private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
    private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>(){

        @Override
        protected TopicStats initialValue() {
            return new TopicStats();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);

    public NonPersistentTopic(String topic, BrokerService brokerService) {
        super(topic, brokerService);
        this.subscriptions = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        this.replicators = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        this.isFenced = false;
    }

    @Override
    public CompletableFuture<Void> initialize() {
        return this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(TopicName.get(this.topic).getNamespaceObject()).thenAccept(optPolicies -> {
            if (!optPolicies.isPresent()) {
                log.warn("[{}] Policies not present and isEncryptionRequired will be set to false", (Object)this.topic);
                this.isEncryptionRequired = false;
            } else {
                Policies policies = (Policies)optPolicies.get();
                this.isEncryptionRequired = policies.encryption_required;
                this.isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
                if (policies.inactive_topic_policies != null) {
                    this.inactiveTopicPolicies = policies.inactive_topic_policies;
                }
                this.setSchemaCompatibilityStrategy(policies);
                this.schemaValidationEnforced = policies.schema_validation_enforced;
            }
        });
    }

    @Override
    public void publishMessage(ByteBuf data, Topic.PublishContext callback) {
        if (this.isExceedMaximumMessageSize(data.readableBytes())) {
            callback.completed(new BrokerServiceException.NotAllowedException("Exceed maximum message size"), -1L, -1L);
            return;
        }
        callback.completed(null, 0L, 0L);
        ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);
        this.subscriptions.forEach((name, subscription) -> {
            ByteBuf duplicateBuffer = data.retainedDuplicate();
            Entry entry = EntryCacheManager.create(0L, 0L, duplicateBuffer);
            duplicateBuffer.release();
            if (subscription.getDispatcher() != null) {
                subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
            } else {
                entry.release();
            }
        });
        if (!this.replicators.isEmpty()) {
            this.replicators.forEach((name, replicator) -> {
                ByteBuf duplicateBuffer = data.retainedDuplicate();
                Entry entry = EntryCacheManager.create(0L, 0L, duplicateBuffer);
                duplicateBuffer.release();
                replicator.sendMessage(entry);
            });
        }
    }

    @Override
    protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch) {
        return CompletableFuture.completedFuture(currentEpoch.orElse(-1L) + 1L);
    }

    @Override
    protected CompletableFuture<Long> setTopicEpoch(long newEpoch) {
        return CompletableFuture.completedFuture(newEpoch);
    }

    @Override
    public void checkMessageDeduplicationInfo() {
    }

    @Override
    public void removeProducer(Producer producer) {
        Preconditions.checkArgument(producer.getTopic() == this);
        if (this.producers.remove(producer.getProducerName(), producer)) {
            this.handleProducerRemoved(producer);
        }
    }

    @Override
    public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName, long consumerId, CommandSubscribe.SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map<String, String> metadata, boolean readCompacted, CommandSubscribe.InitialPosition initialPosition, long resetStartMessageBackInSec, boolean replicateSubscriptionState, KeySharedMeta keySharedMeta) {
        return this.brokerService.checkTopicNsOwnership(this.getName()).thenCompose(__ -> {
            CompletableFuture future = new CompletableFuture();
            if (this.hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Consumer doesn't support batch-message {}", (Object)this.topic, (Object)subscriptionName);
                }
                future.completeExceptionally(new BrokerServiceException.UnsupportedVersionException("Consumer doesn't support batch-message"));
                return future;
            }
            if (subscriptionName.startsWith(this.replicatorPrefix)) {
                log.warn("[{}] Failed to create subscription for {}", (Object)this.topic, (Object)subscriptionName);
                future.completeExceptionally(new BrokerServiceException.NamingException("Subscription with reserved subscription name attempted"));
                return future;
            }
            if (readCompacted) {
                future.completeExceptionally(new BrokerServiceException.NotAllowedException("readCompacted only valid on persistent topics"));
                return future;
            }
            this.lock.readLock().lock();
            try {
                if (this.isFenced) {
                    log.warn("[{}] Attempting to subscribe to a fenced topic", (Object)this.topic);
                    future.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable"));
                    CompletableFuture completableFuture = future;
                    return completableFuture;
                }
                this.handleConsumerAdded(subscriptionName, consumerName);
            }
            finally {
                this.lock.readLock().unlock();
            }
            NonPersistentSubscription subscription = this.subscriptions.computeIfAbsent(subscriptionName, name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
            Consumer consumer = new Consumer(subscription, subType, this.topic, consumerId, priorityLevel, consumerName, 0, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest);
            ((CompletableFuture)this.addConsumerToSubscription(subscription, consumer).thenRun(() -> {
                if (!cnx.isActive()) {
                    try {
                        consumer.close();
                    }
                    catch (BrokerServiceException e) {
                        if (e instanceof BrokerServiceException.ConsumerBusyException) {
                            log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, subscriptionName, consumerId, consumerName});
                        } else if (e instanceof BrokerServiceException.SubscriptionBusyException) {
                            log.warn("[{}][{}] {}", new Object[]{this.topic, subscriptionName, e.getMessage()});
                        }
                        this.decrementUsageCount();
                        future.completeExceptionally(e);
                        return;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", new Object[]{this.topic, subscriptionName, consumer.consumerName(), this.currentUsageCount()});
                    }
                    future.completeExceptionally(new BrokerServiceException("Connection was closed while the opening the cursor "));
                } else {
                    log.info("[{}][{}] Created new subscription for {}", new Object[]{this.topic, subscriptionName, consumerId});
                    future.complete(consumer);
                }
            })).exceptionally(e -> {
                Throwable throwable = e.getCause();
                if (throwable instanceof BrokerServiceException.ConsumerBusyException) {
                    log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, subscriptionName, consumerId, consumerName});
                } else if (throwable instanceof BrokerServiceException.SubscriptionBusyException) {
                    log.warn("[{}][{}] {}", new Object[]{this.topic, subscriptionName, e.getMessage()});
                }
                this.decrementUsageCount();
                future.completeExceptionally(throwable);
                return null;
            });
            return future;
        });
    }

    @Override
    public CompletableFuture<Subscription> createSubscription(String subscriptionName, CommandSubscribe.InitialPosition initialPosition, boolean replicateSubscriptionState) {
        return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true));
    }

    @Override
    public CompletableFuture<Void> delete() {
        return this.delete(false, false, false);
    }

    @Override
    public CompletableFuture<Void> deleteForcefully() {
        return this.delete(false, true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected, boolean deleteSchema) {
        CompletableFuture<Void> deleteFuture = new CompletableFuture<Void>();
        this.lock.writeLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Topic is already being closed or deleted", (Object)this.topic);
                deleteFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                CompletableFuture<Void> completableFuture = deleteFuture;
                return completableFuture;
            }
            CompletableFuture closeClientFuture = new CompletableFuture();
            if (closeIfClientsConnected) {
                ArrayList futures = Lists.newArrayList();
                this.replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
                this.producers.values().forEach(producer -> futures.add(producer.disconnect()));
                this.subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
                ((CompletableFuture)FutureUtil.waitForAll(futures).thenRun(() -> closeClientFuture.complete(null))).exceptionally(ex -> {
                    log.error("[{}] Error closing clients", (Object)this.topic, ex);
                    this.isFenced = false;
                    closeClientFuture.completeExceptionally((Throwable)ex);
                    return null;
                });
            } else {
                closeClientFuture.complete(null);
            }
            ((CompletableFuture)closeClientFuture.thenAccept(delete -> {
                if (this.currentUsageCount() == 0L) {
                    this.isFenced = true;
                    ArrayList<CompletionStage> futures = Lists.newArrayList();
                    if (failIfHasSubscriptions) {
                        if (!this.subscriptions.isEmpty()) {
                            this.isFenced = false;
                            deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has subscriptions"));
                            return;
                        }
                    } else {
                        this.subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
                    }
                    if (deleteSchema) {
                        futures.add(this.deleteSchema().thenApply(schemaVersion -> null));
                    }
                    futures.add(this.deleteTopicPolicies());
                    FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
                        if (ex != null) {
                            log.error("[{}] Error deleting topic", (Object)this.topic, ex);
                            this.isFenced = false;
                            deleteFuture.completeExceptionally((Throwable)ex);
                        } else {
                            this.brokerService.executor().execute(() -> {
                                this.brokerService.removeTopicFromCache(this.topic);
                                log.info("[{}] Topic deleted", (Object)this.topic);
                                deleteFuture.complete(null);
                            });
                        }
                    });
                } else {
                    deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has " + this.currentUsageCount() + " connected producers/consumers"));
                }
            })).exceptionally(ex -> {
                deleteFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Failed to close clients before deleting topic."));
                return null;
            });
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return deleteFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
        CompletableFuture<Void> closeFuture;
        block6: {
            closeFuture = new CompletableFuture<Void>();
            this.lock.writeLock().lock();
            try {
                if (!this.isFenced || closeWithoutWaitingClientDisconnect) {
                    this.isFenced = true;
                    break block6;
                }
                log.warn("[{}] Topic is already being closed or deleted", (Object)this.topic);
                closeFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                CompletableFuture<Void> completableFuture = closeFuture;
                return completableFuture;
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
        ArrayList futures = Lists.newArrayList();
        this.replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
        this.producers.values().forEach(producer -> futures.add(producer.disconnect()));
        if (this.topicPublishRateLimiter != null) {
            this.topicPublishRateLimiter.close();
        }
        this.subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
        if (this.resourceGroupPublishLimiter != null) {
            this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
        }
        CompletableFuture<Object> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) : FutureUtil.waitForAll(futures);
        ((CompletableFuture)clientCloseFuture.thenRun(() -> {
            log.info("[{}] Topic closed", (Object)this.topic);
            this.brokerService.executor().execute(() -> {
                this.brokerService.removeTopicFromCache(this.topic);
                closeFuture.complete(null);
            });
        })).exceptionally(exception -> {
            log.error("[{}] Error closing topic", (Object)this.topic, exception);
            this.isFenced = false;
            closeFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return closeFuture;
    }

    public CompletableFuture<Void> stopReplProducers() {
        ArrayList closeFutures = Lists.newArrayList();
        this.replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
        return FutureUtil.waitForAll(closeFutures);
    }

    @Override
    public CompletableFuture<Void> checkReplication() {
        TopicName name = TopicName.get(this.topic);
        if (!name.isGlobal()) {
            return CompletableFuture.completedFuture(null);
        }
        NamespaceName heartbeatNamespace = this.brokerService.pulsar().getHeartbeatNamespaceV2();
        if (name.getNamespaceObject().equals(heartbeatNamespace)) {
            return CompletableFuture.completedFuture(null);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Checking replication status", (Object)name);
        }
        return this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(TopicName.get(this.topic).getNamespaceObject()).thenCompose(optPolicies -> {
            if (!optPolicies.isPresent()) {
                return FutureUtil.failedFuture(new BrokerServiceException.ServerMetadataException(new MetadataStoreException.NotFoundException()));
            }
            Policies policies = (Policies)optPolicies.get();
            Set<Object> configuredClusters = policies.replication_clusters != null ? policies.replication_clusters : Collections.emptySet();
            String localCluster = this.brokerService.pulsar().getConfiguration().getClusterName();
            ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
            for (String string : configuredClusters) {
                if (string.equals(localCluster) || this.replicators.containsKey(string)) continue;
                futures.add(this.startReplicator(string));
            }
            this.replicators.forEach((cluster, replicator) -> {
                if (!cluster.equals(localCluster) && !configuredClusters.contains(cluster)) {
                    futures.add(this.removeReplicator((String)cluster));
                }
            });
            return FutureUtil.waitForAll(futures);
        });
    }

    CompletableFuture<Void> startReplicator(String remoteCluster) {
        log.info("[{}] Starting replicator to remote: {}", (Object)this.topic, (Object)remoteCluster);
        String localCluster = this.brokerService.pulsar().getConfiguration().getClusterName();
        return this.addReplicationCluster(remoteCluster, this, localCluster);
    }

    protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
        return ((CompletableFuture)AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), this.brokerService).thenCompose(__ -> this.brokerService.pulsar().getPulsarResources().getClusterResources().getClusterAsync(remoteCluster).thenApply(clusterData -> this.brokerService.getReplicationClient(remoteCluster, (Optional<ClusterData>)clusterData)))).thenAccept(replicationClient -> {
            this.replicators.computeIfAbsent(remoteCluster, r -> {
                try {
                    return new NonPersistentReplicator(this, localCluster, remoteCluster, this.brokerService, (PulsarClientImpl)replicationClient);
                }
                catch (PulsarServerException e) {
                    log.error("[{}] Replicator startup failed {}", new Object[]{this.topic, remoteCluster, e});
                    return null;
                }
            });
            if (this.replicators.containsKey(remoteCluster) && this.replicators.get(remoteCluster) == null) {
                this.replicators.remove(remoteCluster);
            }
        });
    }

    CompletableFuture<Void> removeReplicator(String remoteCluster) {
        log.info("[{}] Removing replicator to {}", (Object)this.topic, (Object)remoteCluster);
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        String name = NonPersistentReplicator.getReplicatorName(this.replicatorPrefix, remoteCluster);
        ((CompletableFuture)this.replicators.get(remoteCluster).disconnect().thenRun(() -> log.info("[{}] Successfully removed replicator {}", (Object)name, (Object)remoteCluster))).exceptionally(e -> {
            log.error("[{}] Failed to close replication producer {} {}", new Object[]{this.topic, name, e.getMessage(), e});
            future.completeExceptionally((Throwable)e);
            return null;
        });
        return future;
    }

    private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.checkReplication().thenAccept(res -> {
            log.info("[{}] Policies updated successfully", (Object)this.topic);
            result.complete(null);
        })).exceptionally(th -> {
            log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", new Object[]{this.topic, th.getMessage(), 60L, th});
            this.brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure, 60L, TimeUnit.SECONDS);
            result.completeExceptionally((Throwable)th);
            return null;
        });
        return result;
    }

    @Override
    public void checkMessageExpiry() {
    }

    @Override
    public int getNumberOfConsumers() {
        int count = 0;
        for (NonPersistentSubscription subscription : this.subscriptions.values()) {
            count += subscription.getConsumers().size();
        }
        return count;
    }

    @Override
    public int getNumberOfSameAddressConsumers(String clientAddress) {
        return this.getNumberOfSameAddressConsumers(clientAddress, this.subscriptions.values());
    }

    public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    public ConcurrentOpenHashMap<String, NonPersistentReplicator> getReplicators() {
        return this.replicators;
    }

    @Override
    public Subscription getSubscription(String subscription) {
        return this.subscriptions.get(subscription);
    }

    public Replicator getPersistentReplicator(String remoteCluster) {
        return this.replicators.get(remoteCluster);
    }

    @Override
    public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
        TopicStats topicStats = threadLocalTopicStats.get();
        topicStats.reset();
        this.replicators.forEach((region, replicator) -> replicator.updateRates());
        nsStats.producerCount += this.producers.size();
        bundleStats.producerCount += this.producers.size();
        topicStatsStream.startObject(this.topic);
        topicStatsStream.startList("publishers");
        this.producers.values().forEach(producer -> {
            producer.updateRates();
            PublisherStatsImpl publisherStats = producer.getStats();
            topicStats.aggMsgRateIn += publisherStats.msgRateIn;
            topicStats.aggMsgThroughputIn += publisherStats.msgThroughputIn;
            if (producer.isRemote()) {
                topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
            }
            if (hydratePublishers) {
                StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
            }
        });
        topicStatsStream.endList();
        topicStatsStream.startObject("replication");
        nsStats.replicatorCount += topicStats.remotePublishersStats.size();
        topicStatsStream.endObject();
        topicStatsStream.startObject("subscriptions");
        nsStats.subsCount = (int)((long)nsStats.subsCount + this.subscriptions.size());
        this.subscriptions.forEach((subscriptionName, subscription) -> {
            double subMsgRateOut = 0.0;
            double subMsgThroughputOut = 0.0;
            double subMsgRateRedeliver = 0.0;
            double subMsgAckRate = 0.0;
            try {
                topicStatsStream.startObject((String)subscriptionName);
                topicStatsStream.startList("consumers");
                for (Consumer consumer : subscription.getConsumers()) {
                    ++nsStats.consumerCount;
                    ++bundleStats.consumerCount;
                    consumer.updateRates();
                    ConsumerStatsImpl consumerStats = consumer.getStats();
                    subMsgRateOut += consumerStats.msgRateOut;
                    subMsgThroughputOut += consumerStats.msgThroughputOut;
                    subMsgRateRedeliver += consumerStats.msgRateRedeliver;
                    subMsgAckRate += consumerStats.messageAckRate;
                    StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats);
                }
                topicStatsStream.endList();
                topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false));
                topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                topicStatsStream.writePair("msgRateOut", subMsgRateOut);
                topicStatsStream.writePair("messageAckRate", subMsgAckRate);
                topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                topicStatsStream.writePair("type", subscription.getTypeString());
                if (subscription.getDispatcher() != null) {
                    subscription.getDispatcher().getMessageDropRate().calculateRate();
                    topicStatsStream.writePair("msgDropRate", subscription.getDispatcher().getMessageDropRate().getValueRate());
                }
                topicStatsStream.endObject();
                topicStats.aggMsgRateOut += subMsgRateOut;
                topicStats.aggMsgThroughputOut += subMsgThroughputOut;
                nsStats.msgBacklog += (double)subscription.getNumberOfEntriesInBacklog(false);
            }
            catch (Exception e) {
                log.error("Got exception when creating consumer stats for subscription {}: {}", new Object[]{subscriptionName, e.getMessage(), e});
            }
        });
        topicStatsStream.endObject();
        topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0 : topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn;
        topicStatsStream.writePair("producerCount", this.producers.size());
        topicStatsStream.writePair("averageMsgSize", topicStats.averageMsgSize);
        topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
        topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
        topicStatsStream.writePair("msgThroughputIn", topicStats.aggMsgThroughputIn);
        topicStatsStream.writePair("msgThroughputOut", topicStats.aggMsgThroughputOut);
        topicStatsStream.writePair("msgInCount", this.getMsgInCounter());
        topicStatsStream.writePair("bytesInCount", this.getBytesInCounter());
        topicStatsStream.writePair("msgOutCount", this.getMsgOutCounter());
        topicStatsStream.writePair("bytesOutCount", this.getBytesOutCounter());
        nsStats.msgRateIn += topicStats.aggMsgRateIn;
        nsStats.msgRateOut += topicStats.aggMsgRateOut;
        nsStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
        nsStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
        bundleStats.msgRateIn += topicStats.aggMsgRateIn;
        bundleStats.msgRateOut += topicStats.aggMsgRateOut;
        bundleStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
        bundleStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
        this.addEntryLatencyStatsUsec.refresh();
        NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
        this.addEntryLatencyStatsUsec.reset();
        topicStatsStream.endObject();
    }

    @Override
    public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
        NonPersistentTopicStatsImpl stats = new NonPersistentTopicStatsImpl();
        ObjectObjectHashMap remotePublishersStats = new ObjectObjectHashMap();
        this.producers.values().forEach(producer -> {
            NonPersistentPublisherStatsImpl publisherStats = (NonPersistentPublisherStatsImpl)producer.getStats();
            stats.msgRateIn += publisherStats.msgRateIn;
            stats.msgThroughputIn += publisherStats.msgThroughputIn;
            if (producer.isRemote()) {
                remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
            } else {
                stats.getPublishers().add(publisherStats);
            }
        });
        stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : stats.msgThroughputIn / stats.msgRateIn;
        stats.msgInCounter = this.getMsgInCounter();
        stats.bytesInCounter = this.getBytesInCounter();
        stats.waitingPublishers = this.getWaitingProducersCount();
        stats.bytesOutCounter = this.bytesOutFromRemovedSubscriptions.longValue();
        stats.msgOutCounter = this.msgOutFromRemovedSubscriptions.longValue();
        this.subscriptions.forEach((name, subscription) -> {
            NonPersistentSubscriptionStatsImpl subStats = subscription.getStats();
            stats.msgRateOut += subStats.msgRateOut;
            stats.msgThroughputOut += subStats.msgThroughputOut;
            stats.bytesOutCounter += subStats.bytesOutCounter;
            stats.msgOutCounter += subStats.msgOutCounter;
            stats.getSubscriptions().put((String)name, subStats);
        });
        this.replicators.forEach((cluster, replicator) -> {
            NonPersistentReplicatorStatsImpl replicatorStats = replicator.getStats();
            PublisherStatsImpl pubStats = (PublisherStatsImpl)remotePublishersStats.get(replicator.getRemoteCluster());
            if (pubStats != null) {
                replicatorStats.msgRateIn = pubStats.msgRateIn;
                replicatorStats.msgThroughputIn = pubStats.msgThroughputIn;
                replicatorStats.inboundConnection = pubStats.getAddress();
                replicatorStats.inboundConnectedSince = pubStats.getConnectedSince();
            }
            stats.msgRateOut += replicatorStats.msgRateOut;
            stats.msgThroughputOut += replicatorStats.msgThroughputOut;
            stats.getReplication().put(replicator.getRemoteCluster(), replicatorStats);
        });
        stats.topicEpoch = this.topicEpoch.orElse(null);
        return stats;
    }

    @Override
    public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
        PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
        stats.entriesAddedCounter = ENTRIES_ADDED_COUNTER_UPDATER.get(this);
        stats.cursors = Maps.newTreeMap();
        this.subscriptions.forEach((name, subs) -> stats.cursors.put(name, new ManagedLedgerInternalStats.CursorStats()));
        this.replicators.forEach((name, subs) -> stats.cursors.put(name, new ManagedLedgerInternalStats.CursorStats()));
        return CompletableFuture.completedFuture(stats);
    }

    public boolean isActive() {
        if (TopicName.get(this.topic).isGlobal()) {
            return !this.subscriptions.isEmpty() || this.hasLocalProducers();
        }
        return this.currentUsageCount() != 0L || !this.subscriptions.isEmpty();
    }

    @Override
    public void checkGC() {
        if (!this.isDeleteWhileInactive()) {
            return;
        }
        int maxInactiveDurationInSec = this.inactiveTopicPolicies.getMaxInactiveDurationSeconds();
        if (this.isActive()) {
            this.lastActive = System.nanoTime();
        } else if (System.nanoTime() - this.lastActive > TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec) && TopicName.get(this.topic).isGlobal()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", (Object)this.topic, (Object)maxInactiveDurationInSec);
            }
            ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.stopReplProducers().thenCompose(v -> this.delete(true, false, true))).thenAccept(__ -> this.tryToDeletePartitionedMetadata())).thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", (Object)this.topic))).exceptionally(e -> {
                Throwable throwable = e.getCause();
                if (throwable instanceof BrokerServiceException.TopicBusyException) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Did not delete busy topic: {}", (Object)this.topic, (Object)throwable.getMessage());
                    }
                    this.replicators.forEach((region, replicator) -> replicator.startProducer());
                } else {
                    log.warn("[{}] Inactive topic deletion failed", (Object)this.topic, e);
                }
                return null;
            });
        }
    }

    private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
        if (TopicName.get(this.topic).isPartitioned() && !this.deletePartitionedTopicMetadataWhileInactive()) {
            return CompletableFuture.completedFuture(null);
        }
        TopicName topicName = TopicName.get(TopicName.get(this.topic).getPartitionedTopicName());
        try {
            NamespaceResources.PartitionedTopicResources partitionedTopicResources = this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
            if (!partitionedTopicResources.partitionedTopicExists(topicName)) {
                return CompletableFuture.completedFuture(null);
            }
            return partitionedTopicResources.deletePartitionedTopicAsync(topicName);
        }
        catch (Exception e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @Override
    public void checkInactiveSubscriptions() {
        block3: {
            TopicName name = TopicName.get(this.topic);
            try {
                Policies policies = this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(name.getNamespaceObject()).orElseThrow(MetadataStoreException.NotFoundException::new);
                int defaultExpirationTime = this.brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes();
                Integer nsExpirationTime = policies.subscription_expiration_time_minutes;
                long expirationTimeMillis = TimeUnit.MINUTES.toMillis(nsExpirationTime == null ? (long)defaultExpirationTime : (long)nsExpirationTime.intValue());
                if (expirationTimeMillis > 0L) {
                    this.subscriptions.forEach((subName, sub) -> {
                        if (sub.getDispatcher() != null && sub.getDispatcher().isConsumerConnected() || sub.isReplicated()) {
                            return;
                        }
                        if (System.currentTimeMillis() - sub.getLastActive() > expirationTimeMillis) {
                            sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration", (Object)this.topic, subName));
                        }
                    });
                }
            }
            catch (Exception e) {
                if (!log.isDebugEnabled()) break block3;
                log.debug("[{}] Error getting policies", (Object)this.topic);
            }
        }
    }

    @Override
    public void checkBackloggedCursors() {
    }

    @Override
    public void checkDeduplicationSnapshot() {
    }

    @Override
    public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] isEncryptionRequired changes: {} -> {}", new Object[]{this.topic, this.isEncryptionRequired, data.encryption_required});
        }
        this.isEncryptionRequired = data.encryption_required;
        this.setSchemaCompatibilityStrategy(data);
        this.isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
        this.schemaValidationEnforced = data.schema_validation_enforced;
        ArrayList producerCheckFutures = new ArrayList(this.producers.size());
        this.producers.values().forEach(producer -> producerCheckFutures.add(producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
        return ((CompletableFuture)FutureUtil.waitForAll(producerCheckFutures).thenCompose(__ -> {
            ArrayList consumerCheckFutures = new ArrayList();
            this.subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync())));
            return FutureUtil.waitForAll(consumerCheckFutures).thenCompose(___ -> {
                if (data.inactive_topic_policies != null) {
                    this.inactiveTopicPolicies = data.inactive_topic_policies;
                } else {
                    ServiceConfiguration cfg = this.brokerService.getPulsar().getConfiguration();
                    this.resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode(), cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
                }
                return this.checkReplicationAndRetryOnFailure();
            });
        })).exceptionally(ex -> {
            log.error("[{}] update namespace polices : {} error", new Object[]{this.getName(), data, ex});
            throw FutureUtil.wrapToCompletionException(ex);
        });
    }

    @Override
    public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
        throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic");
    }

    @Override
    public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
        return false;
    }

    @Override
    public boolean isReplicated() {
        return this.replicators.size() > 1L;
    }

    @Override
    public CompletableFuture<Void> unsubscribe(String subscriptionName) {
        return CompletableFuture.runAsync(() -> {
            NonPersistentSubscription sub = this.subscriptions.remove(subscriptionName);
            if (sub != null) {
                NonPersistentSubscriptionStatsImpl stats = sub.getStats();
                this.bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
                this.msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
            }
        }, this.brokerService.executor());
    }

    @Override
    public Position getLastPosition() {
        throw new UnsupportedOperationException("getLastPosition is not supported on non-persistent topic");
    }

    @Override
    public CompletableFuture<MessageId> getLastMessageId() {
        throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic");
    }

    @Override
    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
        return this.hasSchema().thenCompose(hasSchema -> {
            int numActiveConsumers = this.subscriptions.values().stream().mapToInt(subscription -> subscription.getConsumers().size()).sum();
            if (hasSchema.booleanValue() || !this.producers.isEmpty() || numActiveConsumers != 0 || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0L) {
                return this.checkSchemaCompatibleForConsumer(schema);
            }
            return this.addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
        });
    }

    @Override
    public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
        throw new UnsupportedOperationException("PublishTxnMessage is not supported by non-persistent topic");
    }

    @Override
    public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
        return FutureUtil.failedFuture(new Exception("Unsupported operation endTxn in non-persistent topic."));
    }

    @Override
    public CompletableFuture<Void> truncate() {
        return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Unsupported truncate"));
    }

    @Override
    protected boolean isTerminated() {
        return false;
    }

    @Override
    public boolean isPersistent() {
        return false;
    }

    private static class TopicStats {
        public double averageMsgSize;
        public double aggMsgRateIn;
        public double aggMsgThroughputIn;
        public double aggMsgRateOut;
        public double aggMsgThroughputOut;
        public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap();

        public TopicStats() {
            this.reset();
        }

        public void reset() {
            this.averageMsgSize = 0.0;
            this.aggMsgRateIn = 0.0;
            this.aggMsgThroughputIn = 0.0;
            this.aggMsgRateOut = 0.0;
            this.aggMsgThroughputOut = 0.0;
            this.remotePublishersStats.clear();
        }
    }
}

