/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.websocket.proxy;

import java.net.URI;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.proxy.SimpleConsumerSocket;
import org.apache.pulsar.websocket.proxy.SimpleProducerSocket;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
import org.apache.zookeeper.ZooKeeper;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"websocket"})
public class ProxyPublishConsumeWithoutZKTest
extends ProducerConsumerBase {
    protected String methodName;
    private ProxyServer proxyServer;
    private WebSocketService service;
    private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeWithoutZKTest.class);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
        config.setWebServicePort(Optional.of(0));
        config.setClusterName("test");
        config.setServiceUrl(this.pulsar.getSafeWebServiceAddress());
        config.setServiceUrlTls(this.pulsar.getWebServiceAddressTls());
        this.service = (WebSocketService)Mockito.spy((Object)new WebSocketService(config));
        ((WebSocketService)Mockito.doReturn((Object)new ZKMetadataStore((ZooKeeper)this.mockZooKeeper)).when((Object)this.service)).createMetadataStore(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt());
        this.proxyServer = new ProxyServer(config);
        WebSocketServiceStarter.start((ProxyServer)this.proxyServer, (WebSocketService)this.service);
        log.info("Proxy Server Started");
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        if (this.service != null) {
            this.service.close();
        }
        if (this.proxyServer != null) {
            this.proxyServer.stop();
        }
        log.info("Finished Cleaning Up Test setup");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void socketTest() throws Exception {
        String consumerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-sub";
        String producerUri = "ws://localhost:" + this.proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/my-topic/";
        URI consumeUri = URI.create(consumerUri);
        URI produceUri = URI.create(producerUri);
        WebSocketClient consumeClient = new WebSocketClient();
        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
        WebSocketClient produceClient = new WebSocketClient();
        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
        try {
            consumeClient.start();
            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
            Future consumerFuture = consumeClient.connect((Object)consumeSocket, consumeUri, consumeRequest);
            log.info("Connecting to : {}", (Object)consumeUri);
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
            produceClient.start();
            Future producerFuture = produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            Assert.assertTrue((boolean)((Session)consumerFuture.get()).isOpen());
            Assert.assertTrue((boolean)((Session)producerFuture.get()).isOpen());
            while (consumeSocket.getReceivedMessagesCount() < 10) {
                Thread.sleep(10L);
            }
            Assert.assertTrue((produceSocket.getBuffer().size() > 0 ? 1 : 0) != 0);
            Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
        }
        finally {
            ExecutorService executor = Executors.newFixedThreadPool(1);
            try {
                try {
                    executor.submit(() -> {
                        try {
                            consumeClient.stop();
                            produceClient.stop();
                            log.info("proxy clients are stopped successfully");
                        }
                        catch (Exception e) {
                            log.error(e.getMessage());
                        }
                    }).get(2L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    log.error("failed to close clients ", (Throwable)e);
                }
            }
            finally {
                if (Collections.singletonList(executor).get(0) != null) {
                    executor.shutdownNow();
                }
            }
        }
    }
}

