/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class RawReaderTest
extends MockedPulsarServiceBaseTest {
    private static final String subscription = "foobar-sub";

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Set<String> publishMessages(String topic, int count) throws Exception {
        return this.publishMessages(topic, count, false);
    }

    private Set<String> publishMessages(String topic, int count, boolean batching) throws Exception {
        HashSet<String> keys = new HashSet<String>();
        try (Producer producer = this.pulsarClient.newProducer().enableBatching(batching).batchingMaxMessages(10).messageRoutingMode(MessageRoutingMode.SinglePartition).maxPendingMessages(count).topic(topic).create();){
            Future lastFuture = null;
            for (int i = 0; i < count; ++i) {
                String key = "key" + i;
                byte[] data = ("my-message-" + i).getBytes();
                lastFuture = producer.newMessage().key(key).value((Object)data).sendAsync();
                keys.add(key);
            }
            lastFuture.get();
        }
        return keys;
    }

    public static String extractKey(RawMessage m) {
        ByteBuf headersAndPayload = m.getHeadersAndPayload();
        MessageMetadata msgMetadata = Commands.parseMessageMetadata((ByteBuf)headersAndPayload);
        return msgMetadata.getPartitionKey();
    }

    @Test
    public void testRawReader() throws Exception {
        int numKeys = 10;
        String topic = "persistent://my-property/my-ns/my-raw-topic";
        Set<String> keys = this.publishMessages(topic, numKeys);
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)topic, (String)subscription).get();
        MessageId lastMessageId = (MessageId)reader.getLastMessageIdAsync().get();
        while (true) {
            RawMessage m = (RawMessage)reader.readNextAsync().get();
            try {
                Assert.assertTrue((boolean)keys.remove(RawReaderTest.extractKey(m)));
                if (lastMessageId.compareTo((Object)m.getMessageId()) != 0) continue;
            }
            finally {
                if (m == null) continue;
                m.close();
                continue;
            }
            break;
        }
        Assert.assertTrue((boolean)keys.isEmpty());
    }

    @Test
    public void testSeekToStart() throws Exception {
        RawMessage m;
        int numKeys = 10;
        String topic = "persistent://my-property/my-ns/my-raw-topic";
        this.publishMessages(topic, numKeys);
        HashSet<String> readKeys = new HashSet<String>();
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)topic, (String)subscription).get();
        MessageId lastMessageId = (MessageId)reader.getLastMessageIdAsync().get();
        while (true) {
            m = (RawMessage)reader.readNextAsync().get();
            try {
                readKeys.add(RawReaderTest.extractKey(m));
                if (lastMessageId.compareTo((Object)m.getMessageId()) != 0) continue;
            }
            finally {
                if (m == null) continue;
                m.close();
                continue;
            }
            break;
        }
        Assert.assertEquals((int)readKeys.size(), (int)numKeys);
        reader.seekAsync(MessageId.earliest).get();
        while (true) {
            m = (RawMessage)reader.readNextAsync().get();
            try {
                Assert.assertTrue((boolean)readKeys.remove(RawReaderTest.extractKey(m)));
                if (lastMessageId.compareTo((Object)m.getMessageId()) != 0) continue;
            }
            finally {
                if (m == null) continue;
                m.close();
                continue;
            }
            break;
        }
        Assert.assertTrue((boolean)readKeys.isEmpty());
    }

    @Test
    public void testSeekToMiddle() throws Exception {
        RawMessage m;
        int numKeys = 10;
        String topic = "persistent://my-property/my-ns/my-raw-topic";
        this.publishMessages(topic, numKeys);
        HashSet<String> readKeys = new HashSet<String>();
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)topic, (String)subscription).get();
        int i = 0;
        MessageId seekTo = null;
        MessageId lastMessageId = (MessageId)reader.getLastMessageIdAsync().get();
        while (true) {
            m = (RawMessage)reader.readNextAsync().get();
            try {
                if (++i > numKeys / 2) {
                    if (seekTo == null) {
                        seekTo = m.getMessageId();
                    }
                    readKeys.add(RawReaderTest.extractKey(m));
                }
                if (lastMessageId.compareTo((Object)m.getMessageId()) != 0) continue;
            }
            finally {
                if (m == null) continue;
                m.close();
                continue;
            }
            break;
        }
        Assert.assertEquals((int)readKeys.size(), (int)(numKeys / 2));
        reader.seekAsync(seekTo).get();
        while (true) {
            m = (RawMessage)reader.readNextAsync().get();
            try {
                Assert.assertTrue((boolean)readKeys.remove(RawReaderTest.extractKey(m)));
                if (lastMessageId.compareTo((Object)m.getMessageId()) != 0) continue;
            }
            finally {
                if (m == null) continue;
                m.close();
                continue;
            }
            break;
        }
        Assert.assertTrue((boolean)readKeys.isEmpty());
    }

    @Test
    public void testFlowControl() throws Exception {
        int numMessages = 5000;
        String topic = "persistent://my-property/my-ns/my-raw-topic";
        this.publishMessages(topic, numMessages);
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)topic, (String)subscription).get();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        HashSet<String> keys = new HashSet<String>();
        for (int i = 0; i < numMessages + 1; ++i) {
            futures.add(reader.readNextAsync());
        }
        int timeouts = 0;
        for (Future future : futures) {
            try {
                RawMessage m = (RawMessage)future.get(1L, TimeUnit.SECONDS);
                try {
                    String key = RawReaderTest.extractKey(m);
                    Assert.assertTrue((boolean)keys.add(key), (String)("Received duplicated key '" + key + "' : already received keys = " + keys));
                }
                finally {
                    if (m == null) continue;
                    m.close();
                }
            }
            catch (TimeoutException te) {
                ++timeouts;
            }
        }
        Assert.assertEquals((int)timeouts, (int)1);
        Assert.assertEquals((int)keys.size(), (int)numMessages);
    }

    @Test
    public void testFlowControlBatch() throws Exception {
        int numMessages = 5000;
        String topic = "persistent://my-property/my-ns/my-raw-topic";
        this.publishMessages(topic, numMessages, true);
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)topic, (String)subscription).get();
        HashSet<String> keys = new HashSet<String>();
        try {
            block7: while (true) {
                RawMessage m = (RawMessage)reader.readNextAsync().get(1L, TimeUnit.SECONDS);
                try {
                    Assert.assertTrue((boolean)RawBatchConverter.isReadableBatch((RawMessage)m));
                    List batchKeys = RawBatchConverter.extractIdsAndKeysAndSize((RawMessage)m);
                    Iterator iterator = batchKeys.iterator();
                    while (true) {
                        if (!iterator.hasNext()) continue block7;
                        ImmutableTriple pair = (ImmutableTriple)iterator.next();
                        String key = (String)pair.middle;
                        Assert.assertTrue((boolean)keys.add(key), (String)("Received duplicated key '" + key + "' : already received keys = " + keys));
                    }
                }
                finally {
                    if (m == null) continue;
                    m.close();
                    continue;
                }
                break;
            }
        }
        catch (TimeoutException te) {
            Assert.assertEquals((int)keys.size(), (int)numMessages);
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBatchingExtractKeysAndIds() throws Exception {
        String topic = "persistent://my-property/my-ns/my-raw-topic";
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).messageRoutingMode(MessageRoutingMode.SinglePartition).create();){
            producer.newMessage().key("key1").value((Object)"my-content-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-content-2".getBytes()).sendAsync();
            producer.newMessage().key("key3").value((Object)"my-content-3".getBytes()).send();
        }
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)topic, (String)subscription).get();
        try (RawMessage m = (RawMessage)reader.readNextAsync().get();){
            List idsAndKeys = RawBatchConverter.extractIdsAndKeysAndSize((RawMessage)m);
            Assert.assertEquals((int)idsAndKeys.size(), (int)3);
            Assert.assertTrue((((MessageId)((ImmutableTriple)idsAndKeys.get(0)).getLeft()).compareTo((Object)((MessageId)((ImmutableTriple)idsAndKeys.get(1)).getLeft())) < 0 ? 1 : 0) != 0);
            Assert.assertTrue((((MessageId)((ImmutableTriple)idsAndKeys.get(1)).getLeft()).compareTo((Object)((MessageId)((ImmutableTriple)idsAndKeys.get(2)).getLeft())) < 0 ? 1 : 0) != 0);
            Assert.assertEquals((String)((String)((ImmutableTriple)idsAndKeys.get(0)).getMiddle()), (String)"key1");
            Assert.assertEquals((String)((String)((ImmutableTriple)idsAndKeys.get(1)).getMiddle()), (String)"key2");
            Assert.assertEquals((String)((String)((ImmutableTriple)idsAndKeys.get(2)).getMiddle()), (String)"key3");
        }
        finally {
            reader.closeAsync().get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBatchingRebatch() throws Exception {
        String topic = "persistent://my-property/my-ns/my-raw-topic";
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).messageRoutingMode(MessageRoutingMode.SinglePartition).create();){
            producer.newMessage().key("key1").value((Object)"my-content-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-content-2".getBytes()).sendAsync();
            producer.newMessage().key("key3").value((Object)"my-content-3".getBytes()).send();
        }
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)topic, (String)subscription).get();
        try (RawMessage m1 = (RawMessage)reader.readNextAsync().get();){
            RawMessage m2 = (RawMessage)RawBatchConverter.rebatchMessage((RawMessage)m1, (key, id) -> key.equals("key2")).get();
            List idsAndKeys = RawBatchConverter.extractIdsAndKeysAndSize((RawMessage)m2);
            Assert.assertEquals((int)idsAndKeys.size(), (int)1);
            Assert.assertEquals((String)((String)((ImmutableTriple)idsAndKeys.get(0)).getMiddle()), (String)"key2");
            m2.close();
            Assert.assertEquals((int)m1.getHeadersAndPayload().refCnt(), (int)1);
        }
        finally {
            reader.closeAsync().get();
        }
    }

    @Test
    public void testAcknowledgeWithProperties() throws Exception {
        int numKeys = 10;
        String topic = "persistent://my-property/my-ns/my-raw-topic";
        Set<String> keys = this.publishMessages(topic, numKeys);
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)topic, (String)subscription).get();
        MessageId lastMessageId = (MessageId)reader.getLastMessageIdAsync().get();
        while (true) {
            RawMessage m = (RawMessage)reader.readNextAsync().get();
            try {
                Assert.assertTrue((boolean)keys.remove(RawReaderTest.extractKey(m)));
                if (lastMessageId.compareTo((Object)m.getMessageId()) != 0) continue;
            }
            finally {
                if (m == null) continue;
                m.close();
                continue;
            }
            break;
        }
        Assert.assertTrue((boolean)keys.isEmpty());
        HashMap<String, Long> properties = new HashMap<String, Long>();
        properties.put("foobar", 244837814099658L);
        reader.acknowledgeCumulativeAsync(lastMessageId, properties).get();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topic).get();
        ManagedLedger ledger = topicRef.getManagedLedger();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.openCursor(subscription).getProperties().get("foobar"), (Object)244837814099658L));
    }

    @Test
    public void testReadCancellationOnClose() throws Exception {
        int i;
        int numKeys = 10;
        String topic = "persistent://my-property/my-ns/my-raw-topic";
        this.publishMessages(topic, numKeys / 2);
        RawReader reader = (RawReader)RawReader.create((PulsarClient)this.pulsarClient, (String)topic, (String)subscription).get();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (i = 0; i < numKeys; ++i) {
            futures.add(reader.readNextAsync());
        }
        for (i = 0; i < numKeys / 2; ++i) {
            ((Future)futures.remove(0)).get();
        }
        reader.closeAsync().get();
        while (!futures.isEmpty()) {
            try {
                ((Future)futures.remove(0)).get();
                Assert.fail((String)"Should have been cancelled");
            }
            catch (CancellationException cancellationException) {}
        }
    }
}

