/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket.proxy;

import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.google.gson.reflect.TypeToken;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.proxy.SimpleConsumerSocket;
import org.apache.pulsar.websocket.proxy.SimpleProducerSocket;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
import org.apache.pulsar.websocket.stats.ProxyTopicStat;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.logging.LoggingFeature;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"websocket"})
public class ProxyPublishConsumeTest
extends ProducerConsumerBase {
    protected String methodName;
    private ProxyServer proxyServer;
    private WebSocketService service;
    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
    private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setBacklogQuotaCheckIntervalInSeconds(5);
        super.internalSetup();
        super.producerBaseSetup();
        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
        config.setWebServicePort(Optional.of(0));
        config.setClusterName("test");
        config.setConfigurationStoreServers("GLOBAL_DUMMY_VALUE");
        this.service = (WebSocketService)Mockito.spy((Object)new WebSocketService(config));
        ((WebSocketService)Mockito.doReturn((Object)new ZKMetadataStore((ZooKeeper)this.mockZooKeeperGlobal)).when((Object)this.service)).createMetadataStore(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt());
        this.proxyServer = new ProxyServer(config);
        WebSocketServiceStarter.start((ProxyServer)this.proxyServer, (WebSocketService)this.service);
        log.info("Proxy Server Started");
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.resetConfig();
        super.internalCleanup();
        if (this.service != null) {
            this.service.close();
        }
        if (this.proxyServer != null) {
            this.proxyServer.stop();
        }
        log.info("Finished Cleaning Up Test setup");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void socketTest() throws Exception {
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
        String readerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/reader/persistent/my-property/my-ns/my-topic1";
        String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
        URI consumeUri = URI.create(consumerUri);
        URI readUri = URI.create(readerUri);
        URI produceUri = URI.create(producerUri);
        WebSocketClient consumeClient1 = new WebSocketClient();
        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
        WebSocketClient consumeClient2 = new WebSocketClient();
        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
        WebSocketClient readClient = new WebSocketClient();
        SimpleConsumerSocket readSocket = new SimpleConsumerSocket();
        WebSocketClient produceClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
        try {
            consumeClient1.start();
            consumeClient2.start();
            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
            ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
            Future consumerFuture1 = consumeClient1.connect((Object)consumeSocket1, consumeUri, consumeRequest1);
            Future consumerFuture2 = consumeClient2.connect((Object)consumeSocket2, consumeUri, consumeRequest2);
            log.info("Connecting to : {}", (Object)consumeUri);
            readClient.start();
            ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
            Future readerFuture = readClient.connect((Object)readSocket, readUri, readRequest);
            log.info("Connecting to : {}", (Object)readUri);
            Assert.assertTrue((boolean)((Session)consumerFuture1.get()).isOpen());
            Assert.assertTrue((boolean)((Session)consumerFuture2.get()).isOpen());
            Assert.assertTrue((boolean)((Session)readerFuture.get()).isOpen());
            Thread.sleep(500L);
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            produceClient.start();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            Assert.assertTrue((boolean)((Session)producerFuture.get()).isOpen());
            int retry = 0;
            int maxRetry = 400;
            while (consumeSocket1.getReceivedMessagesCount() < 10 && consumeSocket2.getReceivedMessagesCount() < 10 || readSocket.getReceivedMessagesCount() < 10) {
                Thread.sleep(10L);
                if (retry++ <= maxRetry) continue;
                String msg = String.format("Consumer still has not received the message after %s ms", maxRetry * 10);
                log.warn(msg);
                throw new IllegalStateException(msg);
            }
            Assert.assertTrue((boolean)((Session)consumerFuture1.get()).isOpen());
            Assert.assertTrue((boolean)((Session)consumerFuture2.get()).isOpen());
            Assert.assertTrue((produceSocket.getBuffer().size() > 0 ? 1 : 0) != 0);
            if (consumeSocket1.getBuffer().size() > consumeSocket2.getBuffer().size()) {
                Assert.assertEquals(produceSocket.getBuffer(), consumeSocket1.getBuffer());
            } else {
                Assert.assertEquals(produceSocket.getBuffer(), consumeSocket2.getBuffer());
            }
            Assert.assertEquals(produceSocket.getBuffer(), readSocket.getBuffer());
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumeClient1, consumeClient2, readClient, produceClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumeClient1, consumeClient2, readClient, produceClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void socketTestEndOfTopic() throws Exception {
        String topic = "my-property/my-ns/my-topic8";
        String subscription = "my-sub";
        String consumerUri = String.format("ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared", this.proxyServer.getListenPortHTTP().get(), "my-property/my-ns/my-topic8", "my-sub");
        String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", this.proxyServer.getListenPortHTTP().get(), "my-property/my-ns/my-topic8");
        URI consumeUri = URI.create(consumerUri);
        URI produceUri = URI.create(producerUri);
        WebSocketClient consumeClient = new WebSocketClient();
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        WebSocketClient produceClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
        try {
            consumeClient.start();
            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumeClient.connect((Object)consumeSocket, consumeUri, consumeRequest);
            log.info("Connecting to : {}", (Object)consumeUri);
            Assert.assertTrue((boolean)((Session)consumerFuture.get()).isOpen());
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            produceClient.start();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            Assert.assertTrue((boolean)((Session)producerFuture.get()).isOpen());
            produceSocket.sendMessage(20);
            consumeSocket.sendPermits(10);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)10));
            consumeSocket.isEndOfTopic();
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumeSocket.getBuffer().size(), (int)11));
            Assert.assertEquals((String)consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), (String)"{\"endOfTopic\":false}");
            consumeSocket.sendPermits(20);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)31));
            consumeSocket.isEndOfTopic();
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)32));
            Assert.assertEquals((String)consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), (String)"{\"endOfTopic\":false}");
            this.admin.topics().terminateTopicAsync("my-property/my-ns/my-topic8").get();
            consumeSocket.isEndOfTopic();
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)33));
            Assert.assertEquals((String)consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), (String)"{\"endOfTopic\":true}");
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumeClient, produceClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumeClient, produceClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void unsubscribeTest() throws Exception {
        String namespace = "my-property/my-ns";
        String topic = "my-property/my-ns/my-topic7";
        String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic7");
        this.admin.topics().createPartitionedTopic(topicName, 3);
        String subscription = "my-sub";
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + "my-property/my-ns/my-topic7" + "/" + "my-sub";
        URI consumeUri = URI.create(consumerUri);
        WebSocketClient consumeClient = new WebSocketClient();
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        Thread.sleep(500L);
        try {
            consumeClient.start();
            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumeClient.connect((Object)consumeSocket, consumeUri, consumeRequest);
            consumerFuture.get();
            List subs = this.admin.topics().getSubscriptions("my-property/my-ns/my-topic7");
            Assert.assertEquals((int)subs.size(), (int)1);
            Assert.assertEquals((String)((String)subs.get(0)), (String)"my-sub");
            consumeSocket.unsubscribe();
            Thread.sleep(1000L);
            subs = this.admin.topics().getSubscriptions("my-property/my-ns/my-topic7");
            Assert.assertEquals((int)subs.size(), (int)0);
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumeClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumeClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void emptySubscriptionConsumerTest() {
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic2/?subscriptionType=Exclusive";
        URI consumeUri = URI.create(consumerUri);
        WebSocketClient consumeClient1 = new WebSocketClient();
        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
        try {
            consumeClient1.start();
            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
            Future consumerFuture1 = consumeClient1.connect((Object)consumeSocket1, consumeUri, consumeRequest1);
            consumerFuture1.get();
            Assert.fail((String)"should fail: empty subscription");
        }
        catch (Exception e) {
            try {
                Assert.assertTrue((boolean)(e.getCause() instanceof UpgradeException));
                Assert.assertEquals((int)((UpgradeException)e.getCause()).getResponseStatusCode(), (int)400);
            }
            catch (Throwable throwable) {
                this.stopWebSocketClient(consumeClient1);
                throw throwable;
            }
            this.stopWebSocketClient(consumeClient1);
        }
        this.stopWebSocketClient(consumeClient1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void conflictingConsumerTest() throws Exception {
        WebSocketClient consumeClient1;
        block6: {
            String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
            URI consumeUri = URI.create(consumerUri);
            consumeClient1 = new WebSocketClient();
            WebSocketClient consumeClient2 = new WebSocketClient();
            SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
            SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
            try {
                consumeClient1.start();
                ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
                Future consumerFuture1 = consumeClient1.connect((Object)consumeSocket1, consumeUri, consumeRequest1);
                consumerFuture1.get();
                try {
                    consumeClient2.start();
                    ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
                    Future consumerFuture2 = consumeClient2.connect((Object)consumeSocket2, consumeUri, consumeRequest2);
                    consumerFuture2.get();
                    Assert.fail((String)"should fail: conflicting subscription name");
                }
                catch (Exception e) {
                    try {
                        Assert.assertTrue((boolean)(e.getCause() instanceof UpgradeException));
                        Assert.assertEquals((int)((UpgradeException)e.getCause()).getResponseStatusCode(), (int)409);
                    }
                    catch (Throwable throwable) {
                        this.stopWebSocketClient(consumeClient2);
                        throw throwable;
                    }
                    this.stopWebSocketClient(consumeClient2);
                    break block6;
                }
                this.stopWebSocketClient(consumeClient2);
            }
            catch (Throwable throwable) {
                this.stopWebSocketClient(consumeClient1);
                throw throwable;
            }
        }
        this.stopWebSocketClient(consumeClient1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void conflictingProducerTest() throws Exception {
        WebSocketClient produceClient1;
        block6: {
            String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/my-topic4?producerName=my-producer";
            URI produceUri = URI.create(producerUri);
            produceClient1 = new WebSocketClient();
            WebSocketClient produceClient2 = new WebSocketClient();
            SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
            SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
            try {
                produceClient1.start();
                ClientUpgradeRequest produceRequest1 = new ClientUpgradeRequest();
                Future producerFuture1 = produceClient1.connect((Object)produceSocket1, produceUri, produceRequest1);
                producerFuture1.get();
                try {
                    produceClient2.start();
                    ClientUpgradeRequest produceRequest2 = new ClientUpgradeRequest();
                    Future producerFuture2 = produceClient2.connect((Object)produceSocket2, produceUri, produceRequest2);
                    producerFuture2.get();
                    Assert.fail((String)"should fail: conflicting producer name");
                }
                catch (Exception e) {
                    try {
                        Assert.assertTrue((boolean)(e.getCause() instanceof UpgradeException));
                        Assert.assertEquals((int)((UpgradeException)e.getCause()).getResponseStatusCode(), (int)409);
                    }
                    catch (Throwable throwable) {
                        this.stopWebSocketClient(produceClient2);
                        throw throwable;
                    }
                    this.stopWebSocketClient(produceClient2);
                    break block6;
                }
                this.stopWebSocketClient(produceClient2);
            }
            catch (Throwable throwable) {
                this.stopWebSocketClient(produceClient1);
                throw throwable;
            }
        }
        this.stopWebSocketClient(produceClient1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void producerBacklogQuotaExceededTest() throws Exception {
        Future producerFuture;
        ClientUpgradeRequest produceRequest;
        String namespace = "my-property/ns-ws-quota";
        this.admin.namespaces().createNamespace(namespace);
        this.admin.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(10L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
        String topic = namespace + "/my-topic5";
        String subscription = "my-sub";
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + topic + "/" + "my-sub";
        String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + topic;
        URI consumeUri = URI.create(consumerUri);
        URI produceUri = URI.create(producerUri);
        WebSocketClient consumeClient = new WebSocketClient();
        WebSocketClient produceClient1 = new WebSocketClient();
        WebSocketClient produceClient2 = new WebSocketClient();
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
        SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
        try {
            consumeClient.start();
            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumeClient.connect((Object)consumeSocket, consumeUri, consumeRequest);
            consumerFuture.get();
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumeClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumeClient);
        try {
            produceClient1.start();
            produceRequest = new ClientUpgradeRequest();
            producerFuture = produceClient1.connect((Object)produceSocket1, produceUri, produceRequest);
            producerFuture.get();
            produceSocket1.sendMessage(100);
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(produceClient1);
            throw throwable;
        }
        this.stopWebSocketClient(produceClient1);
        Thread.sleep(6000L);
        try {
            produceClient2.start();
            produceRequest = new ClientUpgradeRequest();
            producerFuture = produceClient2.connect((Object)produceSocket2, produceUri, produceRequest);
            producerFuture.get();
            Assert.fail((String)"should fail: backlog quota exceeded");
        }
        catch (Exception e) {
            try {
                Assert.assertTrue((boolean)(e.getCause() instanceof UpgradeException));
                Assert.assertEquals((int)((UpgradeException)e.getCause()).getResponseStatusCode(), (int)503);
            }
            catch (Throwable throwable) {
                this.stopWebSocketClient(produceClient2);
                this.admin.topics().skipAllMessages("persistent://" + topic, "my-sub");
                this.admin.topics().delete("persistent://" + topic);
                this.admin.namespaces().removeBacklogQuota(namespace);
                this.admin.namespaces().deleteNamespace(namespace);
                throw throwable;
            }
            this.stopWebSocketClient(produceClient2);
            this.admin.topics().skipAllMessages("persistent://" + topic, "my-sub");
            this.admin.topics().delete("persistent://" + topic);
            this.admin.namespaces().removeBacklogQuota(namespace);
            this.admin.namespaces().deleteNamespace(namespace);
        }
        this.stopWebSocketClient(produceClient2);
        this.admin.topics().skipAllMessages("persistent://" + topic, "my-sub");
        this.admin.topics().delete("persistent://" + topic);
        this.admin.namespaces().removeBacklogQuota(namespace);
        this.admin.namespaces().deleteNamespace(namespace);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void topicDoesNotExistTest() throws Exception {
        String namespace = "my-property/ns-topic-creation-not-allowed";
        this.admin.namespaces().createNamespace("my-property/ns-topic-creation-not-allowed");
        this.admin.namespaces().setNamespaceReplicationClusters("my-property/ns-topic-creation-not-allowed", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setAutoTopicCreation("my-property/ns-topic-creation-not-allowed", AutoTopicCreationOverride.builder().allowAutoTopicCreation(false).topicType(TopicType.NON_PARTITIONED.toString()).build());
        String topic = "my-property/ns-topic-creation-not-allowed/my-topic";
        String subscription = "my-sub";
        String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + "my-property/ns-topic-creation-not-allowed/my-topic";
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + "my-property/ns-topic-creation-not-allowed/my-topic" + "/" + "my-sub";
        URI produceUri = URI.create(producerUri);
        URI consumeUri = URI.create(consumerUri);
        WebSocketClient produceClient = new WebSocketClient();
        WebSocketClient consumeClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        try {
            produceClient.start();
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            producerFuture.get();
            Assert.fail((String)"should fail: topic does not exist");
        }
        catch (Exception e) {
            try {
                Assert.assertTrue((boolean)(e.getCause() instanceof UpgradeException));
                Assert.assertEquals((int)((UpgradeException)e.getCause()).getResponseStatusCode(), (int)404);
            }
            catch (Throwable throwable) {
                this.stopWebSocketClient(produceClient);
                throw throwable;
            }
            this.stopWebSocketClient(produceClient);
        }
        this.stopWebSocketClient(produceClient);
        try {
            consumeClient.start();
            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumeClient.connect((Object)consumeSocket, consumeUri, consumeRequest);
            consumerFuture.get();
            Assert.fail((String)"should fail: topic does not exist");
        }
        catch (Exception e) {
            try {
                Assert.assertTrue((boolean)(e.getCause() instanceof UpgradeException));
                Assert.assertEquals((int)((UpgradeException)e.getCause()).getResponseStatusCode(), (int)404);
            }
            catch (Throwable throwable) {
                this.stopWebSocketClient(consumeClient);
                throw throwable;
            }
            this.stopWebSocketClient(consumeClient);
        }
        this.stopWebSocketClient(consumeClient);
        this.admin.namespaces().deleteNamespace("my-property/ns-topic-creation-not-allowed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void producerFencedTest() throws Exception {
        String topic = "my-property/my-ns/producer-fenced-test";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/producer-fenced-test").accessMode(ProducerAccessMode.Exclusive).create();
        String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + "my-property/my-ns/producer-fenced-test";
        URI produceUri = URI.create(producerUri);
        WebSocketClient produceClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
        try {
            produceClient.start();
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            producerFuture.get();
            Assert.fail((String)"should fail: producer fenced");
        }
        catch (Exception e) {
            try {
                Assert.assertTrue((boolean)(e.getCause() instanceof UpgradeException));
                Assert.assertEquals((int)((UpgradeException)e.getCause()).getResponseStatusCode(), (int)409);
            }
            catch (Throwable throwable) {
                this.stopWebSocketClient(produceClient);
                producer.close();
                throw throwable;
            }
            this.stopWebSocketClient(produceClient);
            producer.close();
        }
        this.stopWebSocketClient(produceClient);
        producer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void topicTerminatedTest() throws Exception {
        String topic = "my-property/my-ns/topic-terminated-test";
        this.admin.topics().createNonPartitionedTopic("persistent://my-property/my-ns/topic-terminated-test");
        this.admin.topics().terminateTopic("persistent://my-property/my-ns/topic-terminated-test");
        String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + "my-property/my-ns/topic-terminated-test";
        URI produceUri = URI.create(producerUri);
        WebSocketClient produceClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
        try {
            produceClient.start();
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            producerFuture.get();
            Assert.fail((String)"should fail: topic terminated");
        }
        catch (Exception e) {
            try {
                Assert.assertTrue((boolean)(e.getCause() instanceof UpgradeException));
                Assert.assertEquals((int)((UpgradeException)e.getCause()).getResponseStatusCode(), (int)503);
            }
            catch (Throwable throwable) {
                this.stopWebSocketClient(produceClient);
                this.admin.topics().delete("persistent://my-property/my-ns/topic-terminated-test");
                throw throwable;
            }
            this.stopWebSocketClient(produceClient);
            this.admin.topics().delete("persistent://my-property/my-ns/topic-terminated-test");
        }
        this.stopWebSocketClient(produceClient);
        this.admin.topics().delete("persistent://my-property/my-ns/topic-terminated-test");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testProxyStats() throws Exception {
        String topic = "my-property/my-ns/my-topic6";
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + "my-property/my-ns/my-topic6" + "/my-sub?subscriptionType=Failover";
        String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + "my-property/my-ns/my-topic6" + "/";
        String readerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/reader/persistent/" + "my-property/my-ns/my-topic6";
        System.out.println(consumerUri + ", " + producerUri);
        URI consumeUri = URI.create(consumerUri);
        URI produceUri = URI.create(producerUri);
        URI readUri = URI.create(readerUri);
        WebSocketClient consumeClient1 = new WebSocketClient();
        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
        WebSocketClient produceClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
        WebSocketClient readClient = new WebSocketClient();
        SimpleConsumerSocket readSocket = new SimpleConsumerSocket();
        try {
            consumeClient1.start();
            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
            Future consumerFuture1 = consumeClient1.connect((Object)consumeSocket1, consumeUri, consumeRequest1);
            log.info("Connecting to : {}", (Object)consumeUri);
            readClient.start();
            ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
            Future readerFuture = readClient.connect((Object)readSocket, readUri, readRequest);
            log.info("Connecting to : {}", (Object)readUri);
            Assert.assertTrue((boolean)((Session)consumerFuture1.get()).isOpen());
            Assert.assertTrue((boolean)((Session)readerFuture.get()).isOpen());
            Thread.sleep(500L);
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            produceClient.start();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            Assert.assertTrue((boolean)((Session)producerFuture.get()).isOpen());
            int retry = 0;
            int maxRetry = 400;
            while (consumeSocket1.getReceivedMessagesCount() < 2) {
                Thread.sleep(10L);
                if (retry++ <= maxRetry) continue;
                String msg = String.format("Consumer still has not received the message after %s ms", maxRetry * 10);
                log.warn(msg);
                break;
            }
            Client client = ClientBuilder.newClient((Configuration)new ClientConfig().register(LoggingFeature.class));
            String baseUrl = this.pulsar.getSafeWebServiceAddress().replace(Integer.toString((Integer)this.pulsar.getConfiguration().getWebServicePort().get()), Integer.toString((Integer)this.proxyServer.getListenPortHTTP().get())) + "/admin/v2/proxy-stats/";
            this.verifyProxyMetrics(client, baseUrl);
            this.verifyProxyStats(client, baseUrl, "my-property/my-ns/my-topic6");
            this.verifyTopicStat(client, baseUrl + "persistent/", "my-property/my-ns/my-topic6");
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumeClient1, produceClient, readClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumeClient1, produceClient, readClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void consumeMessagesInPartitionedTopicTest() throws Exception {
        String namespace = "my-property/my-ns";
        String topic = "my-property/my-ns/my-topic7";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/my-topic7", 3);
        String subscription = "my-sub";
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + "my-property/my-ns/my-topic7" + "/" + "my-sub";
        String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + "my-property/my-ns/my-topic7";
        URI consumeUri = URI.create(consumerUri);
        URI produceUri = URI.create(producerUri);
        WebSocketClient consumeClient = new WebSocketClient();
        WebSocketClient produceClient = new WebSocketClient();
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
        try {
            produceClient.start();
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            producerFuture.get();
            produceSocket.sendMessage(100);
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(produceClient);
            throw throwable;
        }
        this.stopWebSocketClient(produceClient);
        Thread.sleep(500L);
        try {
            consumeClient.start();
            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumeClient.connect((Object)consumeSocket, consumeUri, consumeRequest);
            consumerFuture.get();
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumeClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumeClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void socketPullModeTest() throws Exception {
        String topic = "my-property/my-ns/my-topic8";
        String subscription = "my-sub";
        String consumerUri = String.format("ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared", this.proxyServer.getListenPortHTTP().get(), "my-property/my-ns/my-topic8", "my-sub");
        String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", this.proxyServer.getListenPortHTTP().get(), "my-property/my-ns/my-topic8");
        URI consumeUri = URI.create(consumerUri);
        URI produceUri = URI.create(producerUri);
        WebSocketClient consumeClient1 = new WebSocketClient();
        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
        WebSocketClient consumeClient2 = new WebSocketClient();
        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
        WebSocketClient produceClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
        try {
            consumeClient1.start();
            consumeClient2.start();
            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
            ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
            Future consumerFuture1 = consumeClient1.connect((Object)consumeSocket1, consumeUri, consumeRequest1);
            Future consumerFuture2 = consumeClient2.connect((Object)consumeSocket2, consumeUri, consumeRequest2);
            log.info("Connecting to : {}", (Object)consumeUri);
            Assert.assertTrue((boolean)((Session)consumerFuture1.get()).isOpen());
            Assert.assertTrue((boolean)((Session)consumerFuture2.get()).isOpen());
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            produceClient.start();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            Assert.assertTrue((boolean)((Session)producerFuture.get()).isOpen());
            produceSocket.sendMessage(100);
            Thread.sleep(500L);
            Assert.assertEquals((int)consumeSocket1.getReceivedMessagesCount(), (int)0);
            Assert.assertEquals((int)consumeSocket2.getReceivedMessagesCount(), (int)0);
            consumeSocket1.sendPermits(3);
            consumeSocket2.sendPermits(2);
            consumeSocket2.sendPermits(2);
            consumeSocket2.sendPermits(2);
            Thread.sleep(500L);
            Assert.assertEquals((int)consumeSocket1.getReceivedMessagesCount(), (int)3);
            Assert.assertEquals((int)consumeSocket2.getReceivedMessagesCount(), (int)6);
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void nackMessageTest() throws Exception {
        String subscription = "my-sub";
        String dlqTopic = "my-property/my-ns/nack-msg-dlq-" + UUID.randomUUID();
        String consumerTopic = "my-property/my-ns/nack-msg-" + UUID.randomUUID();
        String dlqUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + dlqTopic + "/" + "my-sub" + "?subscriptionType=Shared";
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + consumerTopic + "/" + "my-sub" + "?deadLetterTopic=" + dlqTopic + "&maxRedeliverCount=1&subscriptionType=Shared&negativeAckRedeliveryDelay=1000";
        String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/" + consumerTopic;
        WebSocketClient consumeClient1 = new WebSocketClient();
        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
        WebSocketClient consumeClient2 = new WebSocketClient();
        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
        WebSocketClient produceClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);
        consumeSocket1.setMessageHandler((id, data) -> {
            JsonObject nack = new JsonObject();
            nack.add("messageId", (JsonElement)new JsonPrimitive(id));
            nack.add("type", (JsonElement)new JsonPrimitive("negativeAcknowledge"));
            return nack.toString();
        });
        try {
            consumeClient1.start();
            consumeClient2.start();
            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
            ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
            Future consumerFuture1 = consumeClient1.connect((Object)consumeSocket1, URI.create(consumerUri), consumeRequest1);
            Future consumerFuture2 = consumeClient2.connect((Object)consumeSocket2, URI.create(dlqUri), consumeRequest2);
            Assert.assertTrue((boolean)((Session)consumerFuture1.get()).isOpen());
            Assert.assertTrue((boolean)((Session)consumerFuture2.get()).isOpen());
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            produceClient.start();
            Future producerFuture = produceClient.connect((Object)produceSocket, URI.create(producerUri), produceRequest);
            Assert.assertTrue((boolean)((Session)producerFuture.get()).isOpen());
            Assert.assertEquals((int)consumeSocket1.getReceivedMessagesCount(), (int)0);
            Assert.assertEquals((int)consumeSocket2.getReceivedMessagesCount(), (int)0);
            produceSocket.sendMessage(1);
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertEquals((int)consumeSocket1.getReceivedMessagesCount(), (int)2));
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertEquals((int)consumeSocket2.getReceivedMessagesCount(), (int)1));
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void nackRedeliveryDelayTest() throws Exception {
        String uriBase = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2";
        String topic = "my-property/my-ns/nack-redelivery-delay-" + UUID.randomUUID();
        String sub = "my-sub";
        int delayTime = 5000;
        String consumerUri = String.format("%s/consumer/persistent/%s/%s?negativeAckRedeliveryDelay=%d", uriBase, topic, "my-sub", 5000);
        String producerUri = String.format("%s/producer/persistent/%s", uriBase, topic);
        WebSocketClient consumeClient = new WebSocketClient();
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        WebSocketClient produceClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket(0);
        consumeSocket.setMessageHandler((mid, data) -> {
            JsonObject nack = new JsonObject();
            nack.add("type", (JsonElement)new JsonPrimitive("negativeAcknowledge"));
            nack.add("messageId", (JsonElement)new JsonPrimitive(mid));
            return nack.toString();
        });
        try {
            consumeClient.start();
            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumeClient.connect((Object)consumeSocket, URI.create(consumerUri), consumeRequest);
            Assert.assertTrue((boolean)((Session)consumerFuture.get()).isOpen());
            produceClient.start();
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            Future producerFuture = produceClient.connect((Object)produceSocket, URI.create(producerUri), produceRequest);
            Assert.assertTrue((boolean)((Session)producerFuture.get()).isOpen());
            Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)0);
            produceSocket.sendMessage(1);
            Awaitility.await().atMost(4000L, TimeUnit.MILLISECONDS).untilAsserted(() -> Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)1));
            Thread.sleep(5000L);
            Awaitility.await().atMost(4000L, TimeUnit.MILLISECONDS).untilAsserted(() -> Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)2));
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumeClient, produceClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumeClient, produceClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void ackBatchMessageTest() throws Exception {
        String subscription = "my-sub";
        String topic = "my-property/my-ns/ack-batch-message" + UUID.randomUUID();
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + topic + "/" + "my-sub";
        int messages = 10;
        WebSocketClient consumerClient = new WebSocketClient();
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        Producer producer = this.pulsarClient.newProducer().topic(topic).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).create();
        try {
            consumerClient.start();
            ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumerClient.connect((Object)consumeSocket, URI.create(consumerUri), consumerRequest);
            Assert.assertTrue((boolean)((Session)consumerFuture.get()).isOpen());
            Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)0);
            for (int i = 0; i < 10; ++i) {
                producer.sendAsync((Object)String.valueOf(i).getBytes(StandardCharsets.UTF_8));
            }
            producer.flush();
            consumeSocket.sendPermits(10);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)10));
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("my-sub")).getMsgBacklog(), (long)0L));
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumerClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumerClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void consumeEncryptedMessages() throws Exception {
        String subscription = "my-sub";
        String topic = "my-property/my-ns/encrypted" + UUID.randomUUID();
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/" + topic + "/" + "my-sub" + "?cryptoFailureAction=CONSUME";
        int messages = 10;
        WebSocketClient consumerClient = new WebSocketClient();
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
        String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==").addEncryptionKey("ws-consumer-a").create();
        try {
            consumerClient.start();
            ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumerClient.connect((Object)consumeSocket, URI.create(consumerUri), consumerRequest);
            Assert.assertTrue((boolean)((Session)consumerFuture.get()).isOpen());
            Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)0);
            for (int i = 0; i < 10; ++i) {
                producer.sendAsync((Object)String.valueOf(i).getBytes(StandardCharsets.UTF_8));
            }
            producer.flush();
            consumeSocket.sendPermits(10);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumeSocket.getReceivedMessagesCount(), (int)10));
            for (JsonObject msg : consumeSocket.messages) {
                Assert.assertTrue((boolean)msg.has("encryptionContext"));
                JsonObject encryptionCtx = msg.getAsJsonObject("encryptionContext");
                JsonObject keys = encryptionCtx.getAsJsonObject("keys");
                Assert.assertTrue((boolean)keys.has("ws-consumer-a"));
                Assert.assertTrue((boolean)keys.getAsJsonObject("ws-consumer-a").has("keyValue"));
            }
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("my-sub")).getMsgBacklog(), (long)0L));
        }
        catch (Throwable throwable) {
            this.stopWebSocketClient(consumerClient);
            throw throwable;
        }
        this.stopWebSocketClient(consumerClient);
    }

    private void verifyTopicStat(Client client, String baseUrl, String topic) {
        String statUrl = baseUrl + topic + "/stats";
        WebTarget webTarget = client.target(statUrl);
        Invocation.Builder invocationBuilder = webTarget.request(new String[]{"application/json"});
        Response response = invocationBuilder.get();
        String responseStr = (String)response.readEntity(String.class);
        Gson gson = new Gson();
        ProxyTopicStat data = (ProxyTopicStat)gson.fromJson(responseStr, ProxyTopicStat.class);
        Assert.assertFalse((boolean)data.producerStats.isEmpty());
        Assert.assertFalse((boolean)data.consumerStats.isEmpty());
    }

    private void verifyProxyMetrics(Client client, String baseUrl) {
        this.service.getProxyStats().generate();
        String statUrl = baseUrl + "metrics";
        WebTarget webTarget = client.target(statUrl);
        Invocation.Builder invocationBuilder = webTarget.request(new String[]{"application/json"});
        Response response = invocationBuilder.get();
        String responseStr = (String)response.readEntity(String.class);
        Gson gson = new Gson();
        List data = (List)gson.fromJson(responseStr, new TypeToken<List<Metrics>>(){}.getType());
        Assert.assertFalse((boolean)data.isEmpty());
        this.service.getProxyStats().generate();
        invocationBuilder = webTarget.request(new String[]{"application/json"});
        response = invocationBuilder.get();
        responseStr = (String)response.readEntity(String.class);
        data = (List)gson.fromJson(responseStr, new TypeToken<List<Metrics>>(){}.getType());
        Assert.assertFalse((boolean)data.isEmpty());
    }

    private void verifyProxyStats(Client client, String baseUrl, String topic) {
        String statUrl = baseUrl + "stats";
        WebTarget webTarget = client.target(statUrl);
        Invocation.Builder invocationBuilder = webTarget.request(new String[]{"application/json"});
        Response response = invocationBuilder.get();
        String responseStr = (String)response.readEntity(String.class);
        Gson gson = new Gson();
        Map data = (Map)gson.fromJson(responseStr, new TypeToken<Map<String, ProxyTopicStat>>(){}.getType());
        Assert.assertEquals((int)data.size(), (int)1);
        Map.Entry entry = data.entrySet().iterator().next();
        Assert.assertEquals((String)((String)entry.getKey()), (String)("persistent://" + topic));
        ProxyTopicStat stats = (ProxyTopicStat)entry.getValue();
        Assert.assertEquals((int)stats.consumerStats.size(), (int)2);
        ProxyTopicStat.ConsumerStats consumerStats = (ProxyTopicStat.ConsumerStats)stats.consumerStats.iterator().next();
        Assert.assertTrue((consumerStats.numberOfMsgDelivered > 0L ? 1 : 0) != 0);
        Assert.assertNotNull((Object)consumerStats.remoteConnection);
        Assert.assertEquals((int)stats.producerStats.size(), (int)1);
        ProxyTopicStat.ProducerStats producerStats = (ProxyTopicStat.ProducerStats)stats.producerStats.iterator().next();
        Assert.assertTrue((producerStats.numberOfMsgPublished > 0L ? 1 : 0) != 0);
        Assert.assertNotNull((Object)producerStats.remoteConnection);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopWebSocketClient(WebSocketClient ... clients) {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            try {
                executor.submit(() -> {
                    for (WebSocketClient client : clients) {
                        try {
                            client.stop();
                        }
                        catch (Exception e) {
                            log.error(e.getMessage());
                        }
                    }
                    log.info("proxy clients are stopped successfully");
                }).get(2L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                log.error("failed to close proxy clients", (Throwable)e);
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }
}

