/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class InactiveTopicDeleteTest
extends BrokerTestBase {
    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.resetConfig();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testDeleteWhenNoSubscriptions() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptions";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions").create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"}).subscriptionName("sub").subscribe();
        consumer.close();
        producer.close();
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions")));
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", "sub");
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions")));
    }

    @Test
    public void testDeleteAndCleanZkNode() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptions";
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", 5);
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions").create().close();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"}).subscriptionName("sub").subscribe().close();
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions")));
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", "sub");
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions")));
    }

    @Test
    public void testWhenSubPartitionNotDelete() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptions";
        TopicName topicName = TopicName.get((String)"persistent://prop/ns-abc/testDeleteWhenNoSubscriptions");
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", 5);
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions").create().close();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"}).subscriptionName("sub").subscribe().close();
        String managedPath = String.format("/managed-ledgers/%s/%s", topicName.getNamespace(), topicName.getDomain().value());
        String partition0 = topicName.getPartition(0).getLocalName();
        Set cacheSet = (Set)Mockito.mock(Set.class);
        LocalZooKeeperCacheService localZooKeeperCacheService = (LocalZooKeeperCacheService)Mockito.spy((Object)this.pulsar.getLocalZkCacheService());
        ZooKeeperManagedLedgerCache zooKeeperManagedLedgerCache = (ZooKeeperManagedLedgerCache)Mockito.spy((Object)localZooKeeperCacheService.managedLedgerListCache());
        ((PulsarService)Mockito.doReturn((Object)localZooKeeperCacheService).when((Object)this.pulsar)).getLocalZkCacheService();
        ((LocalZooKeeperCacheService)Mockito.doReturn((Object)zooKeeperManagedLedgerCache).when((Object)localZooKeeperCacheService)).managedLedgerListCache();
        ((ZooKeeperManagedLedgerCache)Mockito.doReturn((Object)cacheSet).when((Object)zooKeeperManagedLedgerCache)).get(managedPath);
        ((Set)Mockito.doReturn((Object)true).when((Object)cacheSet)).contains(ArgumentMatchers.argThat(x -> x.equals(partition0)));
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", "sub");
        Thread.sleep(2000L);
        Assert.assertTrue((boolean)this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"));
        ((Set)Mockito.verify((Object)cacheSet, (VerificationMode)Mockito.times((int)5))).contains(partition0);
    }

    @Test
    public void testNotEnabledDeleteZkNode() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        super.baseSetup();
        String namespace = "prop/ns-abc";
        String topic = "persistent://prop/ns-abc/testNotEnabledDeleteZkNode1";
        String topic2 = "persistent://prop/ns-abc/testNotEnabledDeleteZkNode2";
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1", 5);
        this.admin.topics().createNonPartitionedTopic("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2");
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1").create().close();
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2").create().close();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testNotEnabledDeleteZkNode1"}).subscriptionName("sub").subscribe().close();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testNotEnabledDeleteZkNode2"}).subscriptionName("sub2").subscribe().close();
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2")));
        Assert.assertTrue((boolean)this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1"));
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1", "sub");
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2", "sub2");
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2")));
        Assert.assertTrue((boolean)this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1"));
        Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2"));
    }

    @Test(timeOut=20000L)
    public void testTopicPolicyUpdateAndClean() throws Exception {
        String namespace = "prop/ns-abc";
        String namespace2 = "prop/ns-abc2";
        String namespace3 = "prop/ns-abc3";
        List<String> namespaceList = Arrays.asList("prop/ns-abc2", "prop/ns-abc3");
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(Integer.valueOf(1000));
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        InactiveTopicPolicies defaultPolicy = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1000, true);
        super.baseSetup();
        for (String ns : namespaceList) {
            this.admin.namespaces().createNamespace(ns);
            this.admin.namespaces().setNamespaceReplicationClusters(ns, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        }
        String topic = "persistent://prop/ns-abc/testDeletePolicyUpdate";
        String topic2 = "persistent://prop/ns-abc2/testDeletePolicyUpdate";
        String topic3 = "persistent://prop/ns-abc3/testDeletePolicyUpdate";
        List<String> topics = Arrays.asList("persistent://prop/ns-abc/testDeletePolicyUpdate", "persistent://prop/ns-abc2/testDeletePolicyUpdate", "persistent://prop/ns-abc3/testDeletePolicyUpdate");
        for (String tp : topics) {
            this.admin.topics().createNonPartitionedTopic(tp);
        }
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc", inactiveTopicPolicies);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc2", inactiveTopicPolicies);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc3", inactiveTopicPolicies);
        Awaitility.await().until(() -> {
            InactiveTopicPolicies temp = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)"persistent://prop/ns-abc/testDeletePolicyUpdate", (boolean)false).get()).get()).inactiveTopicPolicies;
            return temp.isDeleteWhileInactive();
        });
        InactiveTopicPolicies policies = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)"persistent://prop/ns-abc/testDeletePolicyUpdate", (boolean)false).get()).get()).inactiveTopicPolicies;
        Assert.assertTrue((boolean)policies.isDeleteWhileInactive());
        Assert.assertEquals((Object)policies.getInactiveTopicDeleteMode(), (Object)InactiveTopicDeleteMode.delete_when_no_subscriptions);
        Assert.assertEquals((int)policies.getMaxInactiveDurationSeconds(), (int)1);
        Assert.assertEquals((Object)policies, (Object)this.admin.namespaces().getInactiveTopicPolicies("prop/ns-abc"));
        this.admin.namespaces().removeInactiveTopicPolicies("prop/ns-abc");
        Awaitility.await().until(() -> {
            InactiveTopicPolicies temp = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)"persistent://prop/ns-abc/testDeletePolicyUpdate", (boolean)false).get()).get()).inactiveTopicPolicies;
            return temp.getMaxInactiveDurationSeconds() == 1000;
        });
        Assert.assertEquals((Object)((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)"persistent://prop/ns-abc/testDeletePolicyUpdate", (boolean)false).get()).get()).inactiveTopicPolicies, (Object)defaultPolicy);
        policies = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)"persistent://prop/ns-abc2/testDeletePolicyUpdate", (boolean)false).get()).get()).inactiveTopicPolicies;
        Assert.assertTrue((boolean)policies.isDeleteWhileInactive());
        Assert.assertEquals((Object)policies.getInactiveTopicDeleteMode(), (Object)InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        Assert.assertEquals((int)policies.getMaxInactiveDurationSeconds(), (int)1);
        Assert.assertEquals((Object)policies, (Object)this.admin.namespaces().getInactiveTopicPolicies("prop/ns-abc2"));
        this.admin.namespaces().removeInactiveTopicPolicies("prop/ns-abc2");
        Awaitility.await().until(() -> {
            InactiveTopicPolicies temp = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)"persistent://prop/ns-abc/testDeletePolicyUpdate", (boolean)false).get()).get()).inactiveTopicPolicies;
            return temp.getMaxInactiveDurationSeconds() == 1000;
        });
        Assert.assertEquals((Object)((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)"persistent://prop/ns-abc2/testDeletePolicyUpdate", (boolean)false).get()).get()).inactiveTopicPolicies, (Object)defaultPolicy);
    }

    @Test(timeOut=20000L)
    public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws Exception {
        String namespace = "prop/ns-abc";
        String namespace2 = "prop/ns-abc2";
        String namespace3 = "prop/ns-abc3";
        List<String> namespaceList = Arrays.asList("prop/ns-abc2", "prop/ns-abc3");
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        for (String ns : namespaceList) {
            this.admin.namespaces().createNamespace(ns);
            this.admin.namespaces().setNamespaceReplicationClusters(ns, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        }
        String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig";
        String topic2 = "persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig";
        String topic3 = "persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig";
        List<String> topics = Arrays.asList("persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig", "persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig", "persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig");
        HashMap<String, String> topicToSub = new HashMap<String, String>();
        for (String tp : topics) {
            Producer producer = this.pulsarClient.newProducer().topic(tp).create();
            String subName = "sub" + System.currentTimeMillis();
            topicToSub.put(tp, subName);
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{tp}).subscriptionName(subName).subscribe();
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)"Pulsar".getBytes());
            }
            consumer.close();
            producer.close();
            Thread.sleep(1L);
        }
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc", inactiveTopicPolicies);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc2", inactiveTopicPolicies);
        Awaitility.await().until(() -> {
            InactiveTopicPolicies temp = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)"persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig", (boolean)false).get()).get()).inactiveTopicPolicies;
            return temp.isDeleteWhileInactive();
        });
        Thread.sleep(2000L);
        Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc2").contains("persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc3").contains("persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        this.admin.topics().skipAllMessages("persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig", (String)topicToSub.remove("persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc2").contains("persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig")));
        for (Map.Entry entry : topicToSub.entrySet()) {
            this.admin.topics().deleteSubscription((String)entry.getKey(), (String)entry.getValue());
        }
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig")));
        Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc3").contains("persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig"));
    }

    @Test
    public void testDeleteWhenNoBacklogs() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/testDeleteWhenNoBacklogs";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testDeleteWhenNoBacklogs").create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testDeleteWhenNoBacklogs"}).subscriptionName("sub").subscribe();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)"Pulsar".getBytes());
        }
        consumer.close();
        producer.close();
        Thread.sleep(2000L);
        Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoBacklogs"));
        this.admin.topics().skipAllMessages("persistent://prop/ns-abc/testDeleteWhenNoBacklogs", "sub");
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoBacklogs")));
    }

    @Test
    public void testMaxInactiveDuration() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        this.conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(Integer.valueOf(5));
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/testMaxInactiveDuration";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMaxInactiveDuration").create();
        producer.close();
        Thread.sleep(2000L);
        Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testMaxInactiveDuration"));
        Thread.sleep(4000L);
        Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testMaxInactiveDuration"));
        super.internalCleanup();
    }

    @Test(timeOut=20000L)
    public void testTopicLevelInActiveTopicApi() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.baseSetup();
        String topicName = "persistent://prop/ns-abc/testMaxInactiveDuration-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-sub").subscribe().close();
        TopicName topic = TopicName.get((String)topicName);
        InactiveTopicPolicies inactiveTopicPolicies = this.admin.topics().getInactiveTopicPolicies(topicName);
        Assert.assertNull((Object)inactiveTopicPolicies);
        InactiveTopicPolicies policies = new InactiveTopicPolicies();
        policies.setDeleteWhileInactive(true);
        policies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        policies.setMaxInactiveDurationSeconds(10);
        this.admin.topics().setInactiveTopicPolicies(topicName, policies);
        Awaitility.await().until(() -> this.admin.topics().getInactiveTopicPolicies(topicName) != null);
        Assert.assertEquals((Object)this.admin.topics().getInactiveTopicPolicies(topicName), (Object)policies);
        this.admin.topics().removeInactiveTopicPolicies(topicName);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getInactiveTopicPolicies(topicName)));
    }

    @Test(timeOut=30000L)
    public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(Integer.valueOf(1000));
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        InactiveTopicPolicies defaultPolicy = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1000, true);
        super.baseSetup();
        String namespace = "prop/ns-abc";
        String topic = "persistent://prop/ns-abc/testTopicLevelInactivePolicy" + UUID.randomUUID().toString();
        String topic2 = "persistent://prop/ns-abc/testTopicLevelInactivePolicy" + UUID.randomUUID().toString();
        String topic3 = "persistent://prop/ns-abc/testTopicLevelInactivePolicy" + UUID.randomUUID().toString();
        List<String> topics = Arrays.asList(topic, topic2, topic3);
        for (String tp : topics) {
            this.admin.topics().createNonPartitionedTopic(tp);
        }
        for (String tp : topics) {
            this.pulsarClient.newConsumer().topic(new String[]{tp}).subscriptionName("my-sub").subscribe().close();
            TopicName topicName = TopicName.get((String)tp);
        }
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        this.admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.admin.topics().setInactiveTopicPolicies(topic3, inactiveTopicPolicies);
        Awaitility.await().until(() -> this.admin.topics().getInactiveTopicPolicies(topic) != null);
        InactiveTopicPolicies policies = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)topic, (boolean)false).get()).get()).inactiveTopicPolicies;
        Assert.assertTrue((boolean)policies.isDeleteWhileInactive());
        Assert.assertEquals((Object)policies.getInactiveTopicDeleteMode(), (Object)InactiveTopicDeleteMode.delete_when_no_subscriptions);
        Assert.assertEquals((int)policies.getMaxInactiveDurationSeconds(), (int)1);
        Assert.assertEquals((Object)policies, (Object)this.admin.topics().getInactiveTopicPolicies(topic));
        this.admin.topics().removeInactiveTopicPolicies(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)topic, (boolean)false).get()).get()).inactiveTopicPolicies, (Object)defaultPolicy));
        policies = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)topic2, (boolean)false).get()).get()).inactiveTopicPolicies;
        Assert.assertTrue((boolean)policies.isDeleteWhileInactive());
        Assert.assertEquals((Object)policies.getInactiveTopicDeleteMode(), (Object)InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        Assert.assertEquals((int)policies.getMaxInactiveDurationSeconds(), (int)1);
        Assert.assertEquals((Object)policies, (Object)this.admin.topics().getInactiveTopicPolicies(topic2));
        inactiveTopicPolicies.setMaxInactiveDurationSeconds(999);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc", inactiveTopicPolicies);
        Awaitility.await().until(() -> {
            InactiveTopicPolicies tempPolicies = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)topic, (boolean)false).get()).get()).inactiveTopicPolicies;
            return inactiveTopicPolicies.equals((Object)tempPolicies);
        });
        this.admin.topics().removeInactiveTopicPolicies(topic2);
        Awaitility.await().untilAsserted(() -> {
            InactiveTopicPolicies nsPolicies = ((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic((String)topic2, (boolean)false).get()).get()).inactiveTopicPolicies;
            Assert.assertEquals((int)nsPolicies.getMaxInactiveDurationSeconds(), (int)999);
        });
    }

    @Test(timeOut=30000L)
    public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Exception {
        String namespace = "prop/ns-abc";
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        String topic = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
        String topic2 = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
        String topic3 = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
        List<String> topics = Arrays.asList(topic, topic2, topic3);
        HashMap<String, String> topicToSub = new HashMap<String, String>();
        for (String tp : topics) {
            Producer producer = this.pulsarClient.newProducer().topic(tp).create();
            String subName = "sub" + System.currentTimeMillis();
            topicToSub.put(tp, subName);
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{tp}).subscriptionName(subName).subscribe();
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)"Pulsar".getBytes());
            }
            consumer.close();
            producer.close();
            Thread.sleep(1L);
        }
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        this.admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies);
        Awaitility.await().until(() -> this.admin.topics().getInactiveTopicPolicies(topic2) != null);
        Thread.sleep(2000L);
        Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc").contains(topic));
        Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc").contains(topic2));
        Assert.assertTrue((boolean)this.admin.topics().getList("prop/ns-abc").contains(topic3));
        this.admin.topics().skipAllMessages(topic2, (String)topicToSub.remove(topic2));
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc").contains(topic2)));
        for (Map.Entry entry : topicToSub.entrySet()) {
            this.admin.topics().deleteSubscription((String)entry.getKey(), (String)entry.getValue());
        }
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc").contains(topic3)));
        Assert.assertFalse((boolean)this.admin.topics().getList("prop/ns-abc").contains(topic));
    }

    @Test(timeOut=30000L)
    public void testInactiveTopicApplied() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.baseSetup();
        String namespace = "prop/ns-abc";
        String topic = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
        this.pulsarClient.newProducer().topic(topic).create().close();
        Assert.assertNull((Object)this.admin.namespaces().getInactiveTopicPolicies("prop/ns-abc"));
        Assert.assertNull((Object)this.admin.topics().getInactiveTopicPolicies(topic));
        InactiveTopicPolicies brokerLevelPolicy = new InactiveTopicPolicies(this.conf.getBrokerDeleteInactiveTopicsMode(), this.conf.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), this.conf.isBrokerDeleteInactiveTopicsEnabled());
        Assert.assertEquals((Object)this.admin.topics().getInactiveTopicPolicies(topic, true), (Object)brokerLevelPolicy);
        InactiveTopicPolicies namespaceLevelPolicy = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 20, false);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc", namespaceLevelPolicy);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getInactiveTopicPolicies("prop/ns-abc")));
        InactiveTopicPolicies policyFromBroker = this.admin.topics().getInactiveTopicPolicies(topic, true);
        Assert.assertEquals((int)policyFromBroker.getMaxInactiveDurationSeconds(), (int)20);
        Assert.assertFalse((boolean)policyFromBroker.isDeleteWhileInactive());
        Assert.assertEquals((Object)policyFromBroker.getInactiveTopicDeleteMode(), (Object)InactiveTopicDeleteMode.delete_when_no_subscriptions);
        InactiveTopicPolicies topicLevelPolicy = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, 30, false);
        this.admin.topics().setInactiveTopicPolicies(topic, topicLevelPolicy);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getInactiveTopicPolicies(topic)));
        policyFromBroker = this.admin.topics().getInactiveTopicPolicies(topic, true);
        Assert.assertEquals((int)policyFromBroker.getMaxInactiveDurationSeconds(), (int)30);
        Assert.assertFalse((boolean)policyFromBroker.isDeleteWhileInactive());
        Assert.assertEquals((Object)policyFromBroker.getInactiveTopicDeleteMode(), (Object)InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.topics().removeInactiveTopicPolicies(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getInactiveTopicPolicies(topic, true), (Object)namespaceLevelPolicy));
        this.admin.namespaces().removeInactiveTopicPolicies("prop/ns-abc");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getInactiveTopicPolicies(topic, true), (Object)brokerLevelPolicy));
    }
}

