/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServiceBaseTest {
    private static final String NAMESPACE1 = "system-topic/namespace-1";
    private static final String NAMESPACE2 = "system-topic/namespace-2";
    private static final String NAMESPACE3 = "system-topic/namespace-3";
    private static final TopicName TOPIC1 = TopicName.get((String)"persistent", (NamespaceName)NamespaceName.get((String)"system-topic/namespace-1"), (String)"topic-1");
    private static final TopicName TOPIC2 = TopicName.get((String)"persistent", (NamespaceName)NamespaceName.get((String)"system-topic/namespace-1"), (String)"topic-2");
    private static final TopicName TOPIC3 = TopicName.get((String)"persistent", (NamespaceName)NamespaceName.get((String)"system-topic/namespace-2"), (String)"topic-1");
    private static final TopicName TOPIC4 = TopicName.get((String)"persistent", (NamespaceName)NamespaceName.get((String)"system-topic/namespace-2"), (String)"topic-2");
    private static final TopicName TOPIC5 = TopicName.get((String)"persistent", (NamespaceName)NamespaceName.get((String)"system-topic/namespace-3"), (String)"topic-1");
    private static final TopicName TOPIC6 = TopicName.get((String)"persistent", (NamespaceName)NamespaceName.get((String)"system-topic/namespace-3"), (String)"topic-2");
    private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService;

    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.internalSetup();
        this.prepareData();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConcurrentlyRegisterUnregisterListeners() throws ExecutionException, InterruptedException {
        class TopicPolicyListenerImpl
        implements TopicPolicyListener<TopicPolicies> {
            TopicPolicyListenerImpl() {
            }

            public void onUpdate(TopicPolicies data) {
            }
        }
        TopicName topicName = TopicName.get((String)"test");
        CompletionStage f = CompletableFuture.completedFuture(null).thenRunAsync(() -> {
            for (int i = 0; i < 100; ++i) {
                TopicPolicyListenerImpl listener = new TopicPolicyListenerImpl();
                this.systemTopicBasedTopicPoliciesService.registerListener(topicName, (TopicPolicyListener)listener);
                Assert.assertNotNull(this.systemTopicBasedTopicPoliciesService.listeners.get(topicName));
                Assert.assertTrue((((List)this.systemTopicBasedTopicPoliciesService.listeners.get(topicName)).size() >= 1 ? 1 : 0) != 0);
                this.systemTopicBasedTopicPoliciesService.unregisterListener(topicName, (TopicPolicyListener)listener);
            }
        });
        for (int i = 0; i < 100; ++i) {
            TopicPolicyListenerImpl listener = new TopicPolicyListenerImpl();
            this.systemTopicBasedTopicPoliciesService.registerListener(topicName, (TopicPolicyListener)listener);
            Assert.assertNotNull(this.systemTopicBasedTopicPoliciesService.listeners.get(topicName));
            Assert.assertTrue((((List)this.systemTopicBasedTopicPoliciesService.listeners.get(topicName)).size() >= 1 ? 1 : 0) != 0);
            this.systemTopicBasedTopicPoliciesService.unregisterListener(topicName, (TopicPolicyListener)listener);
        }
        ((CompletableFuture)f).get();
        Assert.assertFalse((boolean)this.systemTopicBasedTopicPoliciesService.listeners.containsKey(topicName));
    }

    @Test
    public void testGetPolicy() throws ExecutionException, InterruptedException, BrokerServiceException.TopicPoliciesCacheNotInitException {
        TopicPolicies initPolicy = TopicPolicies.builder().maxConsumerPerTopic(Integer.valueOf(10)).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)this.systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic(), (int)10));
        TopicPolicies policies1 = TopicPolicies.builder().maxProducerPerTopic(Integer.valueOf(1)).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1).get();
        TopicPolicies policies2 = TopicPolicies.builder().maxProducerPerTopic(Integer.valueOf(2)).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2).get();
        TopicPolicies policies3 = TopicPolicies.builder().maxProducerPerTopic(Integer.valueOf(3)).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC3, policies3).get();
        TopicPolicies policies4 = TopicPolicies.builder().maxProducerPerTopic(Integer.valueOf(4)).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC4, policies4).get();
        TopicPolicies policies5 = TopicPolicies.builder().maxProducerPerTopic(Integer.valueOf(5)).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC5, policies5).get();
        TopicPolicies policies6 = TopicPolicies.builder().maxProducerPerTopic(Integer.valueOf(6)).build();
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, policies6).get();
        Awaitility.await().untilAsserted(() -> {
            TopicPolicies policiesGet1 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
            TopicPolicies policiesGet2 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
            TopicPolicies policiesGet3 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3);
            TopicPolicies policiesGet4 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4);
            TopicPolicies policiesGet5 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5);
            TopicPolicies policiesGet6 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6);
            Assert.assertEquals((Object)policiesGet1, (Object)policies1);
            Assert.assertEquals((Object)policiesGet2, (Object)policies2);
            Assert.assertEquals((Object)policiesGet3, (Object)policies3);
            Assert.assertEquals((Object)policiesGet4, (Object)policies4);
            Assert.assertEquals((Object)policiesGet5, (Object)policies5);
            Assert.assertEquals((Object)policiesGet6, (Object)policies6);
        });
        Assert.assertEquals((long)this.systemTopicBasedTopicPoliciesService.getPoliciesCacheSize(), (long)6L);
        Assert.assertTrue((boolean)this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get((String)NAMESPACE1)));
        Assert.assertTrue((boolean)this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get((String)NAMESPACE2)));
        Assert.assertTrue((boolean)this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get((String)NAMESPACE3)));
        policies1.setMaxProducerPerTopic(Integer.valueOf(101));
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1);
        policies2.setMaxProducerPerTopic(Integer.valueOf(102));
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2);
        policies2.setMaxProducerPerTopic(Integer.valueOf(103));
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2);
        policies1.setMaxProducerPerTopic(Integer.valueOf(104));
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1);
        policies2.setMaxProducerPerTopic(Integer.valueOf(105));
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2);
        policies1.setMaxProducerPerTopic(Integer.valueOf(106));
        this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1);
        Awaitility.await().untilAsserted(() -> {
            TopicPolicies policiesGet1 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
            TopicPolicies policiesGet2 = this.systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
            Assert.assertEquals((Object)policies1, (Object)policiesGet1);
            Assert.assertEquals((Object)policies2, (Object)policiesGet2);
        });
        Assert.assertTrue((boolean)this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get((String)NAMESPACE2)));
        Assert.assertTrue((boolean)this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get((String)NAMESPACE1)));
        Assert.assertTrue((boolean)this.systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get((String)NAMESPACE3)));
        TopicPolicies policiesGet1 = (TopicPolicies)this.systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get();
        Assert.assertEquals((Object)policies1, (Object)policiesGet1);
    }

    @Test
    public void testCacheCleanup() throws Exception {
        String topic = "persistent://system-topic/namespace-1/test" + UUID.randomUUID();
        TopicName topicName = TopicName.get((String)topic);
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.pulsarClient.newProducer().topic(topic).create().close();
        this.admin.topics().setMaxConsumers(topic, 1000);
        Awaitility.await().untilAsserted(() -> AssertJUnit.assertNotNull((Object)this.admin.topics().getMaxConsumers(topic)));
        Map map = this.systemTopicBasedTopicPoliciesService.getPoliciesCache();
        Map listMap = this.systemTopicBasedTopicPoliciesService.getListeners();
        AssertJUnit.assertNotNull(map.get(topicName));
        AssertJUnit.assertEquals((int)((TopicPolicies)map.get(topicName)).getMaxConsumerPerTopic(), (int)1000);
        AssertJUnit.assertNotNull(((List)listMap.get(topicName)).get(0));
        this.admin.topics().deletePartitionedTopic(topic, true);
        this.admin.namespaces().unload(NAMESPACE1);
        AssertJUnit.assertNull(map.get(topicName));
        AssertJUnit.assertNull(listMap.get(topicName));
    }

    @Test
    public void testListenerCleanupByPartition() throws Exception {
        String topic = "persistent://system-topic/namespace-1/test" + UUID.randomUUID();
        TopicName topicName = TopicName.get((String)topic);
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.pulsarClient.newProducer().topic(topic).create().close();
        Map listMap = this.systemTopicBasedTopicPoliciesService.getListeners();
        Awaitility.await().untilAsserted(() -> AssertJUnit.assertEquals((int)((List)listMap.get(topicName)).size(), (int)3));
        this.admin.topics().unload(topicName.getPartition(0).toString());
        AssertJUnit.assertEquals((int)((List)listMap.get(topicName)).size(), (int)2);
        this.admin.topics().unload(topicName.getPartition(1).toString());
        AssertJUnit.assertEquals((int)((List)listMap.get(topicName)).size(), (int)1);
        this.admin.topics().unload(topicName.getPartition(2).toString());
        AssertJUnit.assertNull(listMap.get(topicName));
    }

    private void prepareData() throws PulsarAdminException {
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("system-topic", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet(), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NAMESPACE1);
        this.admin.namespaces().createNamespace(NAMESPACE2);
        this.admin.namespaces().createNamespace(NAMESPACE3);
        this.admin.lookups().lookupTopic(TOPIC1.toString());
        this.admin.lookups().lookupTopic(TOPIC2.toString());
        this.admin.lookups().lookupTopic(TOPIC3.toString());
        this.admin.lookups().lookupTopic(TOPIC4.toString());
        this.admin.lookups().lookupTopic(TOPIC5.toString());
        this.admin.lookups().lookupTopic(TOPIC6.toString());
        this.systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService)this.pulsar.getTopicPoliciesService();
    }

    @Test
    public void testGetPolicyTimeout() throws Exception {
        SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService)this.pulsar.getTopicPoliciesService();
        Awaitility.await().untilAsserted(() -> AssertJUnit.assertTrue((boolean)((Boolean)service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()))));
        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
        long start = System.currentTimeMillis();
        Backoff backoff = new BackoffBuilder().setInitialTime(500L, TimeUnit.MILLISECONDS).setMandatoryStop(5000L, TimeUnit.MILLISECONDS).setMax(1000L, TimeUnit.MILLISECONDS).create();
        try {
            service.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, this.pulsar.getExecutor()).get();
        }
        catch (Exception e) {
            AssertJUnit.assertTrue((boolean)(e.getCause().getCause() instanceof BrokerServiceException.TopicPoliciesCacheNotInitException));
        }
        long cost = System.currentTimeMillis() - start;
        AssertJUnit.assertTrue((String)("actual:" + cost), (cost >= 4000L ? 1 : 0) != 0);
    }

    @Test
    public void testCreatSystemTopicClientWithRetry() throws Exception {
        SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService)Mockito.spy((Object)((SystemTopicBasedTopicPoliciesService)this.pulsar.getTopicPoliciesService()));
        Field field = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("namespaceEventsSystemTopicFactory");
        field.setAccessible(true);
        NamespaceEventsSystemTopicFactory factory = (NamespaceEventsSystemTopicFactory)Mockito.spy((Object)((NamespaceEventsSystemTopicFactory)field.get(service)));
        SystemTopicClient client = (SystemTopicClient)Mockito.mock(TopicPoliciesSystemTopicClient.class);
        ((NamespaceEventsSystemTopicFactory)Mockito.doReturn((Object)client).when((Object)factory)).createTopicPoliciesSystemTopicClient((NamespaceName)ArgumentMatchers.any());
        field.set(service, factory);
        SystemTopicClient.Reader reader = (SystemTopicClient.Reader)Mockito.mock(SystemTopicClient.Reader.class);
        ((SystemTopicClient)Mockito.doThrow((Throwable[])new Throwable[]{new PulsarClientException("test")}).doReturn((Object)reader).when((Object)client)).newReader();
        SystemTopicClient.Reader reader1 = (SystemTopicClient.Reader)service.creatSystemTopicClientWithRetry(null).get();
        AssertJUnit.assertEquals((Object)reader1, (Object)reader);
    }

    @Test
    public void testGetTopicPoliciesWithRetry() throws Exception {
        Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
        initMapField.setAccessible(true);
        Map initMap = (Map)initMapField.get(this.systemTopicBasedTopicPoliciesService);
        initMap.remove(NamespaceName.get((String)NAMESPACE1));
        Field readerCaches = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches");
        readerCaches.setAccessible(true);
        Map readers = (Map)readerCaches.get(this.systemTopicBasedTopicPoliciesService);
        readers.remove(NamespaceName.get((String)NAMESPACE1));
        Backoff backoff = new BackoffBuilder().setInitialTime(500L, TimeUnit.MILLISECONDS).setMandatoryStop(5000L, TimeUnit.MILLISECONDS).setMax(1000L, TimeUnit.MILLISECONDS).create();
        final TopicPolicies initPolicy = TopicPolicies.builder().maxConsumerPerTopic(Integer.valueOf(10)).build();
        ScheduledExecutorService executors = Executors.newScheduledThreadPool(1);
        executors.schedule(new Runnable(){

            @Override
            public void run() {
                try {
                    SystemTopicBasedTopicPoliciesServiceTest.this.systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }, 2000L, TimeUnit.MILLISECONDS);
        Awaitility.await().untilAsserted(() -> {
            Optional topicPolicies = (Optional)this.systemTopicBasedTopicPoliciesService.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, this.pulsar.getExecutor()).get();
            Assert.assertTrue((boolean)topicPolicies.isPresent());
            if (topicPolicies.isPresent()) {
                Assert.assertEquals(topicPolicies.get(), (Object)initPolicy);
            }
        });
    }
}

