/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class SLAMonitoringTest {
    private static final Logger log = LoggerFactory.getLogger(SLAMonitoringTest.class);
    LocalBookkeeperEnsemble bkEnsemble;
    ExecutorService executor;
    private static final int BROKER_COUNT = 5;
    private final int[] brokerWebServicePorts = new int[5];
    private final int[] brokerNativeBrokerPorts = new int[5];
    private final URL[] brokerUrls = new URL[5];
    private final PulsarService[] pulsarServices = new PulsarService[5];
    private final PulsarAdmin[] pulsarAdmins = new PulsarAdmin[5];
    private final ServiceConfiguration[] configurations = new ServiceConfiguration[5];

    @BeforeClass(alwaysRun=true)
    void setup() throws Exception {
        int i;
        this.executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        log.info("---- Initializing SLAMonitoringTest -----");
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        for (i = 0; i < 5; ++i) {
            ServiceConfiguration config = new ServiceConfiguration();
            config.setBrokerShutdownTimeoutMs(0L);
            config.setBrokerServicePort(Optional.of(0));
            config.setBrokerShutdownTimeoutMs(0L);
            config.setClusterName("my-cluster");
            config.setAdvertisedAddress("localhost");
            config.setWebServicePort(Optional.of(0));
            config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            config.setDefaultNumberOfNamespaceBundles(1);
            config.setLoadBalancerEnabled(false);
            this.configurations[i] = config;
            this.pulsarServices[i] = new PulsarService(config);
            this.pulsarServices[i].start();
            this.brokerWebServicePorts[i] = (Integer)this.pulsarServices[i].getListenPortHTTP().get();
            this.brokerNativeBrokerPorts[i] = (Integer)this.pulsarServices[i].getBrokerListenPort().get();
            this.brokerUrls[i] = new URL(this.pulsarServices[i].getWebServiceAddress());
            this.pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(this.brokerUrls[i].toString()).build();
        }
        Thread.sleep(100L);
        this.createTenant(this.pulsarAdmins[4]);
        for (i = 0; i < 5; ++i) {
            String topic = String.format("%s/%s/%s:%s", "sla-monitor", "my-cluster", this.pulsarServices[i].getAdvertisedAddress(), this.brokerWebServicePorts[i]);
            this.pulsarAdmins[0].namespaces().createNamespace(topic);
        }
    }

    private void createTenant(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
        ClusterData clusterData = ClusterData.builder().serviceUrl(pulsarAdmin.getServiceUrl()).build();
        this.pulsarAdmins[0].clusters().createCluster("my-cluster", clusterData);
        HashSet<String> allowedClusters = new HashSet<String>();
        allowedClusters.add("my-cluster");
        TenantInfo adminConfig = TenantInfo.builder().adminRoles(Collections.singleton("")).allowedClusters(allowedClusters).build();
        pulsarAdmin.tenants().createTenant("sla-monitor", adminConfig);
    }

    @AfterClass(alwaysRun=true)
    public void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.executor.shutdownNow();
        this.executor = null;
        for (int i = 0; i < 5; ++i) {
            this.pulsarAdmins[i].close();
            this.pulsarServices[i].close();
        }
        this.bkEnsemble.stop();
    }

    @Test
    public void testOwnershipAfterSetup() {
        for (int i = 0; i < 5; ++i) {
            try {
                Assert.assertTrue((boolean)this.pulsarServices[0].getNamespaceService().registerSLANamespace());
                continue;
            }
            catch (PulsarServerException e) {
                e.printStackTrace();
                log.error("Exception occurred", (Throwable)e);
                Assert.fail((String)"SLA Namespace should have been owned by the broker, Exception.", (Throwable)e);
            }
        }
    }

    @Test
    public void testOwnedNamespaces() {
        this.testOwnershipViaAdminAfterSetup();
        try {
            for (int i = 0; i < 5; ++i) {
                List list = this.pulsarAdmins[i].brokers().getActiveBrokers("my-cluster");
                Assert.assertNotNull((Object)list);
                Assert.assertEquals((int)list.size(), (int)5);
                Map nsMap = this.pulsarAdmins[i].brokers().getOwnedNamespaces("my-cluster", (String)list.get(0));
                Assert.assertEquals((int)nsMap.size(), (int)2);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Hearbeat namespace and SLA namespace should be owned by the broker");
        }
    }

    @Test
    public void testOwnershipViaAdminAfterSetup() {
        for (int i = 0; i < 5; ++i) {
            try {
                String topic = String.format("persistent://%s/%s/%s:%s/%s", "sla-monitor", "my-cluster", this.pulsarServices[i].getAdvertisedAddress(), this.brokerWebServicePorts[i], "my-topic");
                Assert.assertEquals((String)this.pulsarAdmins[0].lookups().lookupTopic(topic), (String)("pulsar://" + this.pulsarServices[i].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[i]));
                continue;
            }
            catch (Exception e) {
                e.printStackTrace();
                Assert.fail((String)("SLA Namespace should have been owned by the broker(pulsar://" + this.pulsarServices[i].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[i] + ")"));
            }
        }
    }

    @Test
    public void testUnloadIfBrokerCrashes() {
        int crashIndex = 2;
        log.info("Trying to close the broker at index = {}", (Object)crashIndex);
        try {
            this.pulsarServices[crashIndex].close();
        }
        catch (PulsarServerException e) {
            e.printStackTrace();
            Assert.fail((String)("Should be a able to close the broker index " + crashIndex + " Exception: " + (Object)((Object)e)));
        }
        String topic = String.format("persistent://%s/%s/%s:%s/%s", "sla-monitor", "my-cluster", this.pulsarServices[crashIndex].getAdvertisedAddress(), this.brokerWebServicePorts[crashIndex], "my-topic");
        log.info("Lookup for namespace {}", (Object)topic);
        String broker = null;
        try {
            broker = this.pulsarAdmins[4].lookups().lookupTopic(topic);
            log.info("{} Namespace is owned by {}", (Object)topic, (Object)broker);
            Assert.assertNotEquals((Object)broker, (Object)("pulsar://" + this.pulsarServices[crashIndex].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[crashIndex]));
        }
        catch (PulsarAdminException e) {
            e.printStackTrace();
            Assert.fail((String)"The SLA Monitor namespace should be owned by some other broker");
        }
        try {
            this.pulsarServices[crashIndex] = new PulsarService(this.configurations[crashIndex]);
            this.pulsarServices[crashIndex].start();
            this.brokerNativeBrokerPorts[crashIndex] = (Integer)this.pulsarServices[crashIndex].getBrokerListenPort().get();
        }
        catch (PulsarServerException e) {
            e.printStackTrace();
            Assert.fail((String)"The broker should be able to start without exception");
        }
        try {
            broker = this.pulsarAdmins[0].lookups().lookupTopic(topic);
            log.info("{} Namespace is re-owned by {}", (Object)topic, (Object)broker);
            Assert.assertEquals((String)broker, (String)("pulsar://" + this.pulsarServices[crashIndex].getAdvertisedAddress() + ":" + this.brokerNativeBrokerPorts[crashIndex]));
        }
        catch (PulsarAdminException e) {
            e.printStackTrace();
            Assert.fail((String)("The SLA Monitor namespace should be reowned by the broker" + broker));
        }
        try {
            this.pulsarServices[crashIndex].close();
        }
        catch (PulsarServerException e) {
            e.printStackTrace();
            Assert.fail((String)"The broker should be able to stop without exception");
        }
    }
}

