/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.InternalServerErrorException;
import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.zookeeper.KeeperException;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class AuthenticatedProducerConsumerTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);
    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
    private final String BASIC_CONF_FILE_PATH = "./src/test/resources/authentication/basic/.htpasswd";

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        if (this.methodName.equals("testAnonymousSyncProducerAndConsumer")) {
            this.conf.setAnonymousUserRole("anonymousUser");
        }
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthorizationEnabled(true);
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.conf.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        this.conf.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        this.conf.setTlsAllowInsecureConnection(true);
        HashSet<String> superUserRoles = new HashSet<String>();
        superUserRoles.add("localhost");
        superUserRoles.add("superUser");
        superUserRoles.add("superUser2");
        superUserRoles.add("admin");
        this.conf.setSuperUserRoles(superUserRoles);
        this.conf.setBrokerClientTlsEnabled(true);
        this.conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.conf.setBrokerClientAuthenticationParameters("tlsCertFile:./src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:./src/test/resources/authentication/tls/client-key.pem");
        HashSet<String> providers = new HashSet<String>();
        providers.add(AuthenticationProviderTls.class.getName());
        providers.add(AuthenticationProviderBasic.class.getName());
        System.setProperty("pulsar.auth.basic.conf", "./src/test/resources/authentication/basic/.htpasswd");
        this.conf.setAuthenticationProviders(providers);
        this.conf.setClusterName("test");
        this.conf.setNumExecutorThreadPoolSize(5);
        super.init();
    }

    protected final void internalSetup(Authentication auth) throws Exception {
        this.admin = (PulsarAdmin)Mockito.spy((Object)PulsarAdmin.builder().serviceHttpUrl(this.brokerUrlTls.toString()).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem").allowTlsInsecureConnection(true).authentication(auth).build());
        String lookupUrl = this.methodName.equals("testBasicCryptSyncProducerAndConsumer") ? this.pulsar.getWebServiceAddressTls() : this.pulsar.getBrokerServiceUrlTls();
        this.replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0L, TimeUnit.SECONDS).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem").allowTlsInsecureConnection(true).authentication(auth).enableTls(true));
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="batch")
    public Object[][] codecProvider() {
        return new Object[][]{{0}, {1000}};
    }

    private void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
    }

    @Test(dataProvider="batch")
    public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        authParams.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authTls = new AuthenticationTls();
        authTls.configure(authParams);
        this.internalSetup((Authentication)authTls);
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.testSyncProducerAndConsumer(batchMessageDelayMs);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testBasicCryptSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        AuthenticationBasic authPassword = new AuthenticationBasic();
        authPassword.configure("{\"userId\":\"superUser\",\"password\":\"supepass\"}");
        this.internalSetup((Authentication)authPassword);
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet(), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.testSyncProducerAndConsumer(batchMessageDelayMs);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testBasicArp1SyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        AuthenticationBasic authPassword = new AuthenticationBasic();
        authPassword.configure("{\"userId\":\"superUser2\",\"password\":\"superpassword\"}");
        this.internalSetup((Authentication)authPassword);
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet(), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.testSyncProducerAndConsumer(batchMessageDelayMs);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        authParams.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authTls = new AuthenticationTls();
        authTls.configure(authParams);
        this.internalSetup((Authentication)authTls);
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).serviceUrlTls(this.brokerUrlTls.toString()).brokerServiceUrl(this.pulsar.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar.getBrokerServiceUrlTls()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"anonymousUser"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.close();
        this.admin = (PulsarAdmin)Mockito.spy((Object)PulsarAdmin.builder().serviceHttpUrl(this.brokerUrl.toString()).build());
        this.admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.topics().grantPermission("persistent://my-property/my-ns/my-topic", "anonymousUser", EnumSet.allOf(AuthAction.class));
        this.replacePulsarClient(PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).operationTimeout(1, TimeUnit.SECONDS));
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/other-topic"}).subscriptionName("my-subscriber-name").subscribe();
        this.testSyncProducerAndConsumer(batchMessageDelayMs);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testAuthenticationFilterNegative() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        authParams.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authTls = new AuthenticationTls();
        authTls.configure(authParams);
        this.internalSetup((Authentication)authTls);
        String cluster = "test";
        ClusterData clusterData = ClusterData.builder().serviceUrl(this.brokerUrl.toString()).serviceUrlTls(this.brokerUrlTls.toString()).brokerServiceUrl(this.pulsar.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar.getBrokerServiceUrlTls()).build();
        try {
            this.admin.clusters().createCluster("test", clusterData);
        }
        catch (PulsarAdminException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof InternalServerErrorException));
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testInternalServerExceptionOnLookup() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        authParams.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authTls = new AuthenticationTls();
        authTls.configure(authParams);
        this.internalSetup((Authentication)authTls);
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).serviceUrlTls(this.brokerUrlTls.toString()).brokerServiceUrl(this.pulsar.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar.getBrokerServiceUrlTls()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        String namespace = "my-property/my-ns";
        this.admin.namespaces().createNamespace(namespace, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topic = "persistent://" + namespace + "1/topic1";
        this.mockZooKeeperGlobal.setAlwaysFail(KeeperException.Code.SESSIONEXPIRED);
        this.pulsar.getConfiguration().setSuperUserRoles((Set)Sets.newHashSet());
        try {
            this.admin.topics().getPartitionedTopicMetadata(topic);
        }
        catch (PulsarAdminException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof InternalServerErrorException));
        }
        try {
            this.admin.lookups().lookupTopic(topic);
        }
        catch (PulsarAdminException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof InternalServerErrorException));
        }
        this.mockZooKeeperGlobal.unsetAlwaysFail();
    }

    @Test
    public void testDeleteAuthenticationPoliciesOfTopic() throws Exception {
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        authParams.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authTls = new AuthenticationTls();
        authTls.configure(authParams);
        this.internalSetup((Authentication)authTls);
        this.admin.clusters().createCluster("test", ClusterData.builder().build());
        this.admin.tenants().createTenant("p1", (TenantInfo)new TenantInfoImpl(Collections.emptySet(), new HashSet(this.admin.clusters().getClusters())));
        this.admin.namespaces().createNamespace("p1/ns1");
        String topic = "persistent://p1/ns1/topic";
        this.admin.topics().createNonPartitionedTopic(topic);
        this.admin.topics().grantPermission(topic, "test-user", EnumSet.of(AuthAction.consume));
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)((Policies)this.pulsar.getPulsarResources().getNamespaceResources().get((String)PulsarWebResource.path((String[])new String[]{"policies", NamespaceName.get((String)"p1/ns1").toString()})).get()).auth_policies.getTopicAuthentication().containsKey(topic)));
        this.admin.topics().delete(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)((Policies)((Optional)this.pulsar.getPulsarResources().getNamespaceResources().getAsync((String)PulsarWebResource.path((String[])new String[]{"policies", NamespaceName.get((String)"p1/ns1").toString()})).get()).get()).auth_policies.getTopicAuthentication().containsKey(topic)));
        String partitionedTopic = "persistent://p1/ns1/partitioned-topic";
        int numPartitions = 5;
        this.admin.topics().createPartitionedTopic(partitionedTopic, numPartitions);
        this.admin.topics().grantPermission(partitionedTopic, "test-user", EnumSet.of(AuthAction.consume));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue((boolean)((Policies)this.pulsar.getPulsarResources().getNamespaceResources().get((String)PulsarWebResource.path((String[])new String[]{"policies", NamespaceName.get((String)"p1/ns1").toString()})).get()).auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
            for (int i = 0; i < numPartitions; ++i) {
                Assert.assertTrue((boolean)((Policies)this.pulsar.getPulsarResources().getNamespaceResources().get((String)PulsarWebResource.path((String[])new String[]{"policies", NamespaceName.get((String)"p1/ns1").toString()})).get()).auth_policies.getTopicAuthentication().containsKey(TopicName.get((String)partitionedTopic).getPartition(i).toString()));
            }
        });
        this.admin.topics().deletePartitionedTopic("persistent://p1/ns1/partitioned-topic");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse((boolean)((Policies)this.pulsar.getPulsarResources().getNamespaceResources().get((String)PulsarWebResource.path((String[])new String[]{"policies", NamespaceName.get((String)"p1/ns1").toString()})).get()).auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
            for (int i = 0; i < numPartitions; ++i) {
                Assert.assertFalse((boolean)((Policies)this.pulsar.getPulsarResources().getNamespaceResources().get((String)PulsarWebResource.path((String[])new String[]{"policies", NamespaceName.get((String)"p1/ns1").toString()})).get()).auth_policies.getTopicAuthentication().containsKey(TopicName.get((String)partitionedTopic).getPartition(i).toString()));
            }
        });
        this.admin.namespaces().deleteNamespace("p1/ns1");
        this.admin.tenants().deleteTenant("p1");
        this.admin.clusters().deleteCluster("test");
    }
}

