/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import io.netty.buffer.Unpooled;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class EnableProxyProtocolTest
extends BrokerTestBase {
    @Override
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setHaProxyProtocolEnabled(true);
        super.baseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSimpleProduceAndConsume() throws PulsarClientException {
        String namespace = "prop/ns-abc";
        String topicName = "persistent://prop/ns-abc/testSimpleProduceAndConsume";
        String subName = "my-subscriber-name";
        int messages = 100;
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testSimpleProduceAndConsume"}).subscriptionName("my-subscriber-name").subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testSimpleProduceAndConsume").create();
            try {
                for (int i = 0; i < 100; ++i) {
                    producer.send((Object)("Message-" + i).getBytes());
                }
                int received = 0;
                for (int i = 0; i < 100; ++i) {
                    consumer.acknowledge(consumer.receive());
                    ++received;
                }
                Assert.assertEquals((int)received, (int)100);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    @Test
    public void testProxyProtocol() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
        String namespace = "prop/ns-abc";
        String topicName = "persistent://prop/ns-abc/testProxyProtocol";
        String subName = "my-subscriber-name";
        PulsarClientImpl client = (PulsarClientImpl)this.pulsarClient;
        CompletableFuture cnx = client.getCnxPool().getConnection(InetSocketAddress.createUnresolved("localhost", (Integer)this.pulsar.getBrokerListenPort().get()));
        ((ClientCnx)cnx.get()).ctx().channel().writeAndFlush((Object)Unpooled.copiedBuffer((byte[])"PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes()));
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testProxyProtocol"}).subscriptionName("my-subscriber-name").subscribe();
        Consumer c = (Consumer)((Topic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testProxyProtocol").get()).getSubscription("my-subscriber-name").getConsumers().get(0);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)c.cnx().hasHAProxyMessage()));
        TopicStats topicStats = this.admin.topics().getStats("persistent://prop/ns-abc/testProxyProtocol");
        Assert.assertEquals((int)topicStats.getSubscriptions().size(), (int)1);
        SubscriptionStats subscriptionStats = (SubscriptionStats)topicStats.getSubscriptions().get("my-subscriber-name");
        Assert.assertEquals((int)subscriptionStats.getConsumers().size(), (int)1);
        Assert.assertEquals((String)((ConsumerStats)subscriptionStats.getConsumers().get(0)).getAddress(), (String)"198.51.100.22:35646");
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testProxyProtocol").create();
        topicStats = this.admin.topics().getStats("persistent://prop/ns-abc/testProxyProtocol");
        Assert.assertEquals((int)topicStats.getPublishers().size(), (int)1);
        Assert.assertEquals((String)((PublisherStats)topicStats.getPublishers().get(0)).getAddress(), (String)"198.51.100.22:35646");
    }
}

