/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PersistentTopicTest
extends BrokerTestBase {
    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testCleanFailedUnloadTopic() throws Exception {
        String topicName = "persistent://prop/ns-abc/failedUnload";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/failedUnload").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/failedUnload").get();
        org.testng.Assert.assertNotNull((Object)topicRef);
        ManagedLedger ml = topicRef.ledger;
        LedgerHandle ledger = (LedgerHandle)Mockito.mock(LedgerHandle.class);
        Field handleField = ml.getClass().getDeclaredField("currentLedger");
        handleField.setAccessible(true);
        handleField.set(ml, ledger);
        ((LedgerHandle)Mockito.doNothing().when((Object)ledger)).asyncClose((AsyncCallback.CloseCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)"persistent://prop/ns-abc/failedUnload"));
        this.pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 5L, TimeUnit.SECONDS).get();
        PersistentTopicTest.retryStrategically(test -> !this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/failedUnload").isPresent(), 5, 500L);
        org.testng.Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/failedUnload").isPresent());
        producer.close();
    }

    @Test
    public void testUnblockStuckSubscription() throws Exception {
        String topicName = "persistent://prop/ns-abc/stuckSubscriptionTopic";
        String sharedSubName = "shared";
        String failoverSubName = "failOver";
        Consumer consumer1 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-abc/stuckSubscriptionTopic").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/stuckSubscriptionTopic").get();
        PersistentSubscription sharedSub = topic.getSubscription("shared");
        PersistentSubscription failOverSub = topic.getSubscription("failOver");
        PersistentDispatcherMultipleConsumers sharedDispatcher = (PersistentDispatcherMultipleConsumers)sharedSub.getDispatcher();
        PersistentDispatcherSingleActiveConsumer failOverDispatcher = (PersistentDispatcherSingleActiveConsumer)failOverSub.getDispatcher();
        consumer1.close();
        consumer2.close();
        sharedDispatcher.havePendingRead = true;
        failOverDispatcher.havePendingRead = true;
        producer.newMessage().value((Object)"test").eventTime(5L).send();
        producer.newMessage().value((Object)"test").eventTime(5L).send();
        consumer1 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Message msg = consumer1.receive(2, TimeUnit.SECONDS);
        org.testng.Assert.assertNull((Object)msg);
        msg = consumer2.receive(2, TimeUnit.SECONDS);
        org.testng.Assert.assertNull((Object)msg);
        sharedDispatcher.havePendingRead = false;
        failOverDispatcher.havePendingRead = false;
        sharedSub.checkAndUnblockIfStuck();
        failOverDispatcher.checkAndUnblockIfStuck();
        Assert.assertTrue((boolean)sharedSub.checkAndUnblockIfStuck());
        Assert.assertTrue((boolean)failOverDispatcher.checkAndUnblockIfStuck());
        msg = consumer1.receive(5, TimeUnit.SECONDS);
        org.testng.Assert.assertNotNull((Object)msg);
        msg = consumer2.receive(5, TimeUnit.SECONDS);
        org.testng.Assert.assertNotNull((Object)msg);
    }

    @Test
    public void testDeleteNamespaceInfiniteRetry() throws Exception {
        String myNamespace = "prop/ns" + UUID.randomUUID();
        this.admin.namespaces().createNamespace(myNamespace, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topic = "persistent://" + myNamespace + "/testDeleteNamespaceInfiniteRetry";
        this.conf.setForceDeleteNamespaceAllowed(true);
        this.pulsarClient.newProducer().topic(topic).create().close();
        this.admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).until(() -> this.admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
        PersistentTopic persistentTopic = (PersistentTopic)Mockito.spy((Object)((PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get()));
        Policies policies = new Policies();
        policies.deleted = true;
        persistentTopic.onPoliciesUpdate(policies);
        ((PersistentTopic)Mockito.verify((Object)persistentTopic, (VerificationMode)Mockito.times((int)0))).checkReplicationAndRetryOnFailure();
        policies.deleted = false;
        persistentTopic.onPoliciesUpdate(policies);
        ((PersistentTopic)Mockito.verify((Object)persistentTopic, (VerificationMode)Mockito.times((int)1))).checkReplicationAndRetryOnFailure();
    }

    @Test
    public void testAccumulativeStats() throws Exception {
        String topicName = "persistent://prop/ns-abc/aTopic";
        String sharedSubName = "shared";
        String failoverSubName = "failOver";
        Consumer consumer1 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-abc/aTopic").create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/aTopic").get();
        TopicStatsImpl stats = topic.getStats(false, false);
        org.testng.Assert.assertEquals((long)stats.getBytesInCounter(), (long)0L);
        org.testng.Assert.assertEquals((long)stats.getMsgInCounter(), (long)0L);
        org.testng.Assert.assertEquals((long)stats.getBytesOutCounter(), (long)0L);
        org.testng.Assert.assertEquals((long)stats.getMsgOutCounter(), (long)0L);
        producer.newMessage().value((Object)"test").eventTime(5L).send();
        producer.newMessage().value((Object)"test").eventTime(5L).send();
        Message msg = consumer1.receive();
        org.testng.Assert.assertNotNull((Object)msg);
        msg = consumer2.receive();
        org.testng.Assert.assertNotNull((Object)msg);
        TopicStatsImpl statsBeforeUnsubscribe = topic.getStats(false, false);
        Assert.assertTrue((statsBeforeUnsubscribe.getBytesInCounter() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((statsBeforeUnsubscribe.getMsgInCounter() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((statsBeforeUnsubscribe.getBytesOutCounter() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((statsBeforeUnsubscribe.getMsgOutCounter() > 0L ? 1 : 0) != 0);
        consumer1.unsubscribe();
        consumer2.unsubscribe();
        producer.close();
        topic.getProducers().values().forEach(arg_0 -> ((PersistentTopic)topic).removeProducer(arg_0));
        org.testng.Assert.assertEquals((int)topic.getProducers().size(), (int)0);
        TopicStatsImpl statsAfterUnsubscribe = topic.getStats(false, false);
        org.testng.Assert.assertEquals((long)statsAfterUnsubscribe.getBytesInCounter(), (long)statsBeforeUnsubscribe.getBytesInCounter());
        org.testng.Assert.assertEquals((long)statsAfterUnsubscribe.getMsgInCounter(), (long)statsBeforeUnsubscribe.getMsgInCounter());
        org.testng.Assert.assertEquals((long)statsAfterUnsubscribe.getBytesOutCounter(), (long)statsBeforeUnsubscribe.getBytesOutCounter());
        org.testng.Assert.assertEquals((long)statsAfterUnsubscribe.getMsgOutCounter(), (long)statsBeforeUnsubscribe.getMsgOutCounter());
    }

    @Test
    public void testPersistentPartitionedTopicUnload() throws Exception {
        String topicName = "persistent://prop/ns/failedUnload";
        String ns = "prop/ns";
        int partitions = 5;
        boolean producers = true;
        int bundles = 2;
        this.admin.namespaces().createNamespace("prop/ns", 2);
        this.admin.topics().createPartitionedTopic("persistent://prop/ns/failedUnload", 5);
        ArrayList<Producer> producerSet = new ArrayList<Producer>();
        for (int i = 0; i < 1; ++i) {
            producerSet.add(this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns/failedUnload").create());
        }
        org.testng.Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopics().containsKey((Object)"persistent://prop/ns/failedUnload"));
        this.pulsar.getBrokerService().getTopicIfExists("persistent://prop/ns/failedUnload").get();
        Assert.assertTrue((boolean)this.pulsar.getBrokerService().getTopics().containsKey((Object)"persistent://prop/ns/failedUnload"));
        org.testng.Assert.assertFalse((boolean)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns/failedUnload").isPresent());
        NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)"persistent://prop/ns/failedUnload"));
        this.pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 5L, TimeUnit.SECONDS).get();
        for (Producer producer : producerSet) {
            producer.close();
        }
    }
}

