/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class SimpleProducerConsumerTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
    private static final int RECEIVE_TIMEOUT_SECONDS = 3;
    private static final int RECEIVE_TIMEOUT_SHORT_MILLIS = 100;
    private static final int RECEIVE_TIMEOUT_MEDIUM_MILLIS = 500;

    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @DataProvider
    public static Object[][] variationsForExpectedPos() {
        return new Object[][]{{true, true, 10}, {true, false, 10}, {false, true, 10}, {false, false, 10}, {true, true, 100}, {true, false, 100}, {false, true, 100}, {false, false, 100}};
    }

    @DataProvider(name="ackReceiptEnabled")
    public Object[][] ackReceiptEnabled() {
        return new Object[][]{{true}, {false}};
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPublishTimestampBatchDisabled() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        final AtomicLong ticker = new AtomicLong(0L);
        Clock clock = new Clock(){

            @Override
            public ZoneId getZone() {
                return ZoneId.systemDefault();
            }

            @Override
            public Clock withZone(ZoneId zone) {
                return this;
            }

            @Override
            public Instant instant() {
                return Instant.ofEpochMilli(this.millis());
            }

            @Override
            public long millis() {
                return ticker.incrementAndGet();
            }
        };
        PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).clock(clock).build();
        try {
            String topic = "persistent://my-property/my-ns/test-publish-timestamp";
            Consumer consumer = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-publish-timestamp"}).subscriptionName("my-sub").subscribe();
            try {
                Producer producer = newPulsarClient.newProducer().topic("persistent://my-property/my-ns/test-publish-timestamp").enableBatching(false).create();
                try {
                    int i;
                    int numMessages = 5;
                    for (i = 0; i < 5; ++i) {
                        producer.newMessage().value((Object)("value-" + i).getBytes(StandardCharsets.UTF_8)).eventTime((long)(i + 1) * 100L).sendAsync();
                    }
                    producer.flush();
                    for (i = 0; i < 5; ++i) {
                        Message msg = consumer.receive(3, TimeUnit.SECONDS);
                        log.info("Received message '{}'.", (Object)new String((byte[])msg.getValue(), StandardCharsets.UTF_8));
                        Assert.assertEquals((long)(1L + (long)i), (long)msg.getPublishTime());
                        Assert.assertEquals((long)(100L * (long)(i + 1)), (long)msg.getEventTime());
                    }
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPublishTimestampBatchEnabled() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        final AtomicLong ticker = new AtomicLong(0L);
        Clock clock = new Clock(){

            @Override
            public ZoneId getZone() {
                return ZoneId.systemDefault();
            }

            @Override
            public Clock withZone(ZoneId zone) {
                return this;
            }

            @Override
            public Instant instant() {
                return Instant.ofEpochMilli(this.millis());
            }

            @Override
            public long millis() {
                return ticker.incrementAndGet();
            }
        };
        PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).clock(clock).build();
        try {
            String topic = "persistent://my-property/my-ns/test-publish-timestamp";
            Consumer consumer = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-publish-timestamp"}).subscriptionName("my-sub").subscribe();
            try {
                int numMessages = 5;
                Producer producer = newPulsarClient.newProducer().topic("persistent://my-property/my-ns/test-publish-timestamp").enableBatching(true).batchingMaxMessages(50).create();
                try {
                    int i;
                    for (i = 0; i < 5; ++i) {
                        producer.newMessage().value((Object)("value-" + i).getBytes(StandardCharsets.UTF_8)).eventTime((long)(i + 1) * 100L).sendAsync();
                    }
                    producer.flush();
                    for (i = 0; i < 5; ++i) {
                        Message msg = consumer.receive(3, TimeUnit.SECONDS);
                        log.info("Received message '{}'.", (Object)new String((byte[])msg.getValue(), StandardCharsets.UTF_8));
                        Assert.assertEquals((long)1L, (long)msg.getPublishTime());
                        Assert.assertEquals((long)(100L * (long)(i + 1)), (long)msg.getEventTime());
                    }
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    @DataProvider(name="batchAndAckReceipt")
    public Object[][] codecProviderWithAckReceipt() {
        return new Object[][]{{0, true}, {1000, false}, {0, true}, {1000, false}};
    }

    @DataProvider(name="batch")
    public Object[][] codecProvider() {
        return new Object[][]{{0}, {1000}};
    }

    @Test(dataProvider="batch")
    public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(3, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batchAndAckReceipt")
    public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, boolean ackReceiptEnabled) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).isAckReceiptEnabled(ackReceiptEnabled).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(3, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.info("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        CompletableFuture ackFuture = consumer.acknowledgeCumulativeAsync(msg);
        log.info("Waiting for async ack to complete");
        ackFuture.get();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch", timeOut=100000L)
    public void testMessageListener(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int numMessages = 100;
        CountDownLatch latch = new CountDownLatch(numMessages);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic3"}).subscriptionName("my-subscriber-name").messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            c1.acknowledgeAsync(msg);
            latch.countDown();
        }).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic3");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue((boolean)latch.await(numMessages, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=100000L)
    public void testPauseAndResume() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int receiverQueueSize = 20;
        AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(receiverQueueSize));
        AtomicInteger received = new AtomicInteger();
        Consumer consumer = this.pulsarClient.newConsumer().receiverQueueSize(receiverQueueSize).topic(new String[]{"persistent://my-property/my-ns/my-topic-pr"}).subscriptionName("my-subscriber-name").messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            c1.acknowledgeAsync(msg);
            received.incrementAndGet();
            ((CountDownLatch)latch.get()).countDown();
        }).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic-pr").create();
        consumer.pause();
        for (int i = 0; i < receiverQueueSize * 2; ++i) {
            producer.send((Object)("my-message-" + i).getBytes());
        }
        log.info("Waiting for message listener to ack " + receiverQueueSize + " messages");
        Assert.assertTrue((boolean)latch.get().await(receiverQueueSize, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        log.info("Giving message listener an opportunity to receive messages while paused");
        Thread.sleep(2000L);
        Assert.assertEquals((int)received.intValue(), (int)receiverQueueSize, (String)"Consumer received messages while paused");
        latch.set(new CountDownLatch(receiverQueueSize));
        consumer.resume();
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue((boolean)latch.get().await(receiverQueueSize, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=30000L)
    public void testPauseAndResumeWithUnloading() throws Exception {
        String topicName = "persistent://my-property/my-ns/pause-and-resume-with-unloading";
        String subName = "sub";
        int receiverQueueSize = 20;
        AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(20));
        AtomicInteger received = new AtomicInteger();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/pause-and-resume-with-unloading"}).subscriptionName("sub").receiverQueueSize(20).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            c1.acknowledgeAsync(msg);
            received.incrementAndGet();
            ((CountDownLatch)latch.get()).countDown();
        }).subscribe();
        consumer.pause();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/pause-and-resume-with-unloading").enableBatching(false).create();
        for (int i = 0; i < 40; ++i) {
            producer.send((Object)("my-message-" + i).getBytes());
        }
        Assert.assertTrue((boolean)latch.get().await(20L, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        this.admin.topics().unload("persistent://my-property/my-ns/pause-and-resume-with-unloading");
        Thread.sleep(2000L);
        Assert.assertEquals((int)received.intValue(), (int)20, (String)"Consumer received messages while paused");
        latch.set(new CountDownLatch(20));
        consumer.resume();
        Assert.assertTrue((boolean)latch.get().await(20L, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        consumer.unsubscribe();
        producer.close();
    }

    @Test(dataProvider="batch")
    public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic4"}).subscriptionName("my-subscriber-name").startMessageIdInclusive().subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic4");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        } else {
            producerBuilder.enableBatching(false);
        }
        Producer producer = producerBuilder.create();
        for (int i2 = 0; i2 < 10; ++i2) {
            producer.sendAsync((Object)("my-message-" + i2).getBytes()).thenApply(msgId -> {
                log.info("Published message id: {}", msgId);
                return msgId;
            });
        }
        producer.flush();
        for (i = 0; i < 10; ++i) {
            Message msg = consumer.receive(3, TimeUnit.SECONDS);
            log.info("Received: [{}]", (Object)new String(msg.getData()));
        }
        log.info("-- Restarting broker --");
        this.restartBroker();
        Message msg = null;
        log.info("Receiving duplicate messages..");
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive(3, TimeUnit.SECONDS);
            log.info("Received: [{}]", (Object)new String(msg.getData()));
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testSendTimeout(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic5"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true);
            producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
            producerBuilder.batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        String message = "my-message";
        this.stopBroker();
        CompletableFuture future = producer.sendAsync((Object)"my-message".getBytes());
        try {
            future.get();
            Assert.fail((String)"Send operation should have failed");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
        this.startBroker();
        Message msg = consumer.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testInvalidSequence() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build();
        client1.close();
        try {
            client1.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic6"}).subscriptionName("my-subscriber-name").subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.AlreadyClosedException));
        }
        try {
            client1.newProducer().topic("persistent://my-property/my-ns/my-topic6").create();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.AlreadyClosedException));
        }
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic6").create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic6"}).subscriptionName("my-subscriber-name").subscribe();
        try {
            TypedMessageBuilder builder = producer.newMessage().value((Object)"InvalidMessage".getBytes());
            Message msg = ((TypedMessageBuilderImpl)builder).getMessage();
            consumer.acknowledge(msg);
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
        consumer.close();
        try {
            consumer.receive(3, TimeUnit.SECONDS);
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        try {
            consumer.unsubscribe();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        producer.close();
        try {
            producer.send((Object)"message".getBytes());
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
    }

    @Test
    public void testSillyUser() {
        try {
            PulsarClient.builder().serviceUrl("invalid://url").build();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.InvalidServiceURL));
        }
        try {
            this.pulsarClient.newProducer().sendTimeout(-1, TimeUnit.SECONDS);
            Assert.fail((String)"should fail");
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newProducer().topic("invalid://topic").create();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.InvalidTopicNameException));
        }
        try {
            this.pulsarClient.newConsumer().messageListener(null);
            Assert.fail((String)"should fail");
        }
        catch (NullPointerException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().subscriptionType(null);
            Assert.fail((String)"should fail");
        }
        catch (NullPointerException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().receiverQueueSize(-1);
            Assert.fail((String)"should fail");
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic7"}).subscriptionName(null).subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (IllegalArgumentException | PulsarClientException e) {
            Assert.assertEquals(e.getClass(), IllegalArgumentException.class);
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic7"}).subscriptionName("").subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (IllegalArgumentException | PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof IllegalArgumentException));
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"invalid://topic7"}).subscriptionName("my-subscriber-name").subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.InvalidTopicNameException));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="batch", groups={"quarantine"})
    public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) throws Exception {
        int recvQueueSize = 100;
        int numConsumersThreads = 10;
        String subName = UUID.randomUUID().toString();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic7"}).subscriptionName(subName).startMessageIdInclusive().receiverQueueSize(100).subscribe();
        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            Message msg;
            CyclicBarrier barrier = new CyclicBarrier(11);
            for (int i = 0; i < 10; ++i) {
                executor.submit(() -> {
                    barrier.await();
                    consumer.receive(3, TimeUnit.SECONDS);
                    return null;
                });
            }
            barrier.await();
            this.restartBroker();
            ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic7");
            if (batchMessageDelayMs != 0) {
                producerBuilder.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
                producerBuilder.batchingMaxMessages(5);
                producerBuilder.enableBatching(true);
            }
            Producer producer = producerBuilder.create();
            for (int i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            ConsumerImpl consumerImpl = (ConsumerImpl)consumer;
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)10);
                Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)90);
            });
            barrier.reset();
            for (int i = 0; i < 10; ++i) {
                executor.submit(() -> {
                    barrier.await();
                    consumer.receive(3, TimeUnit.SECONDS);
                    return null;
                });
            }
            barrier.await();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)20);
                Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)80);
            });
            while ((msg = consumer.receive(3, TimeUnit.SECONDS)) != null) {
            }
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)0);
                Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)0);
            });
            barrier.reset();
            for (int i = 0; i < 10; ++i) {
                executor.submit(() -> {
                    barrier.await();
                    consumer.receive(3, TimeUnit.SECONDS);
                    return null;
                });
            }
            barrier.await();
            this.restartBroker();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)10);
                Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)90);
            });
            consumer.close();
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void testSendBigMessageSize() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "persistent://my-property/my-ns/bigMsg";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").create();
        producer.newMessage().value((Object)new byte[0x500000]);
        try {
            producer.send((Object)new byte[0x500001]);
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
    }

    @Test
    public void testSendBigMessageSizeButCompressed() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "persistent://my-property/my-ns/bigMsg";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.LZ4).create();
        producer.send((Object)new byte[0x500001]);
        producer.close();
        producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.LZ4).create();
        producer.send((Object)new byte[0x500001]);
        producer.close();
        producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.NONE).create();
        try {
            producer.send((Object)new byte[0x500001]);
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
        producer.close();
        producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.LZ4).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/bigMsg"}).subscriptionName("sub1").subscribe();
        byte[] content = new byte[0x50000A];
        producer.send((Object)content);
        Assert.assertEquals((byte[])consumer.receive(3, TimeUnit.SECONDS).getData(), (byte[])content);
        producer.close();
        producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/bigMsg").enableBatching(true).messageRoutingMode(MessageRoutingMode.SinglePartition).compressionType(CompressionType.NONE).create();
        try {
            producer.send((Object)new byte[0x500001]);
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
        producer.close();
        consumer.close();
    }

    @Test
    public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
        int i;
        Message msg;
        int i2;
        log.info("-- Starting {} test --", (Object)this.methodName);
        long batchMessageDelayMs = 100L;
        int receiverSize = 10;
        String topicName = "cache-topic";
        String sub1 = "faster-sub1";
        String sub2 = "slower-sub2";
        Consumer subscriber1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/cache-topic"}).subscriptionName("faster-sub1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
        String topic = "persistent://my-property/my-ns/cache-topic";
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/cache-topic");
        producerBuilder.batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS);
        producerBuilder.batchingMaxMessages(5);
        producerBuilder.enableBatching(true);
        Producer producer = producerBuilder.create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://my-property/my-ns/cache-topic").get();
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)topicRef.getManagedLedger();
        java.lang.reflect.Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        cacheField.setAccessible(true);
        java.lang.reflect.Field modifiersField = java.lang.reflect.Field.class.getDeclaredField("modifiers");
        modifiersField.setAccessible(true);
        modifiersField.setInt(cacheField, cacheField.getModifiers() & 0xFFFFFFEF);
        EntryCacheImpl entryCache = (EntryCacheImpl)Mockito.spy((Object)((EntryCacheImpl)cacheField.get(ledger)));
        cacheField.set(ledger, entryCache);
        for (i2 = 0; i2 < 30; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
        }
        for (i2 = 0; i2 < 30; ++i2) {
            msg = subscriber1.receive(3, TimeUnit.SECONDS);
            subscriber1.acknowledge(msg);
        }
        ((EntryCacheImpl)Mockito.verify((Object)entryCache, (VerificationMode)Mockito.atLeastOnce())).invalidateEntries((PositionImpl)Mockito.any());
        Thread.sleep(1000L);
        producer.send((Object)"message".getBytes());
        msg = subscriber1.receive(3, TimeUnit.SECONDS);
        Consumer subscriber2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/cache-topic"}).subscriptionName("slower-sub2").subscribe();
        int moreMessages = 10;
        for (i = 0; i < 20; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < 20; ++i) {
            msg = subscriber1.receive(3, TimeUnit.SECONDS);
            subscriber1.acknowledge(msg);
        }
        Thread.sleep(1000L);
        producer.send((Object)"message".getBytes());
        msg = subscriber1.receive(3, TimeUnit.SECONDS);
        Awaitility.await().untilAsserted(() -> Assert.assertNotEquals((Object)entryCache.getSize(), (Object)0));
        subscriber2.close();
        SimpleProducerConsumerTest.retryStrategically(test -> entryCache.getSize() == 0L, 5, 100L);
        Assert.assertEquals((long)entryCache.getSize(), (long)0L);
        subscriber1.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="ackReceiptEnabled")
    public void testDeactivatingBacklogConsumer(boolean ackReceiptEnabled) throws Exception {
        Message msg;
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        long batchMessageDelayMs = 100L;
        int receiverSize = 10;
        String topicName = "cache-topic";
        String topic = "persistent://my-property/my-ns/cache-topic";
        String sub1 = "faster-sub1";
        String sub2 = "slower-sub2";
        Consumer subscriber1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/cache-topic"}).subscriptionName("faster-sub1").isAckReceiptEnabled(ackReceiptEnabled).subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
        Consumer subscriber2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/cache-topic"}).subscriptionName("slower-sub2").isAckReceiptEnabled(ackReceiptEnabled).subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/cache-topic");
        producerBuilder.enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        Producer producer = producerBuilder.create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://my-property/my-ns/cache-topic").get();
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)topicRef.getManagedLedger();
        long maxMessageCacheRetentionTimeMillis = this.conf.getManagedLedgerCacheEvictionTimeThresholdMillis();
        long maxActiveCursorBacklogEntries = this.conf.getManagedLedgerCursorBackloggedThreshold();
        int totalMsgs = (int)maxActiveCursorBacklogEntries + 10 + 1;
        for (i = 0; i < totalMsgs; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < totalMsgs; ++i) {
            msg = subscriber1.receive(100, TimeUnit.MILLISECONDS);
            subscriber1.acknowledgeAsync(msg);
        }
        Thread.sleep(maxMessageCacheRetentionTimeMillis);
        topicRef.checkBackloggedCursors();
        Thread.sleep(100L);
        HashSet activeSubscriber = Sets.newHashSet();
        ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
        Assert.assertTrue((boolean)activeSubscriber.contains("faster-sub1"));
        Assert.assertFalse((boolean)activeSubscriber.contains("slower-sub2"));
        for (int i2 = 0; i2 < totalMsgs; ++i2) {
            msg = subscriber2.receive(100, TimeUnit.MILLISECONDS);
            subscriber2.acknowledgeAsync(msg);
        }
        topicRef.checkBackloggedCursors();
        activeSubscriber.clear();
        ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
        Assert.assertTrue((boolean)activeSubscriber.contains("faster-sub1"));
        Assert.assertTrue((boolean)activeSubscriber.contains("slower-sub2"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=5000L)
    public void testAsyncProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        HashSet produceMsgs = Sets.newHashSet();
        HashSet consumeMsgs = Sets.newHashSet();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            produceMsgs.add(message);
        }
        log.info(" start receiving messages :");
        CountDownLatch latch = new CountDownLatch(100);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            this.receiveAsync((Consumer<byte[]>)consumer, 100, 0, latch, consumeMsgs, executor);
            latch.await();
            Assert.assertEquals((int)produceMsgs.size(), (int)100);
            produceMsgs.removeAll(consumeMsgs);
            Assert.assertTrue((boolean)produceMsgs.isEmpty());
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=5000L)
    public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        HashSet produceMsgs = Sets.newHashSet();
        HashSet consumeMsgs = Sets.newHashSet();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            produceMsgs.add(message);
        }
        log.info(" start receiving messages :");
        CountDownLatch latch = new CountDownLatch(100);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            this.receiveAsync((Consumer<byte[]>)consumer, 100, 0, latch, consumeMsgs, executor);
            latch.await();
            Assert.assertEquals((int)produceMsgs.size(), (int)100);
            produceMsgs.removeAll(consumeMsgs);
            Assert.assertTrue((boolean)produceMsgs.isEmpty());
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void testSendCallBackReturnSequenceId() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().enableBatching(false).topic("persistent://my-property/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
        Producer producer = producerBuilder.create();
        String message = "my-message";
        this.stopBroker();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (int i = 0; i < 3; ++i) {
            CompletableFuture future = producer.newMessage().sequenceId((long)i).value((Object)"my-message".getBytes()).sendAsync();
            futures.add(future);
        }
        Thread.sleep(3000L);
        ((CompletableFuture)futures.get(0)).exceptionally(ex -> {
            long sequenceId = ((PulsarClientException)ex.getCause()).getSequenceId();
            Assert.assertEquals((long)sequenceId, (long)0L);
            return null;
        });
        ((CompletableFuture)futures.get(1)).exceptionally(ex -> {
            long sequenceId = ((PulsarClientException)ex.getCause()).getSequenceId();
            Assert.assertEquals((long)sequenceId, (long)1L);
            return null;
        });
        ((CompletableFuture)futures.get(2)).exceptionally(ex -> {
            long sequenceId = ((PulsarClientException)ex.getCause()).getSequenceId();
            Assert.assertEquals((long)sequenceId, (long)2L);
            return null;
        });
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testSendCallBack() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            int len = message.getBytes().length;
            AtomicInteger msgLength = new AtomicInteger();
            CompletionStage future = producer.sendAsync((Object)message.getBytes()).handle((r, ex) -> {
                if (ex != null) {
                    log.error("Message send failed:", ex);
                } else {
                    msgLength.set(len);
                }
                return null;
            });
            ((CompletableFuture)future).get();
            Assert.assertEquals((int)message.getBytes().length, (int)msgLength.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled", timeOut=30000L)
    public void testSharedConsumerAckDifferentConsumer(boolean ackReceiptEnabled) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).isAckReceiptEnabled(ackReceiptEnabled).acknowledgmentGroupTime(0L, TimeUnit.SECONDS);
        Consumer consumer1 = consumerBuilder.subscribe();
        Consumer consumer2 = consumerBuilder.subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        HashSet consumerMsgSet1 = Sets.newHashSet();
        HashSet consumerMsgSet2 = Sets.newHashSet();
        for (int i = 0; i < 5; ++i) {
            Message msg = consumer1.receive(3, TimeUnit.SECONDS);
            consumerMsgSet1.add(msg);
            msg = consumer2.receive(3, TimeUnit.SECONDS);
            consumerMsgSet2.add(msg);
        }
        consumerMsgSet1.forEach(m -> {
            try {
                consumer2.acknowledge(m);
            }
            catch (PulsarClientException e) {
                Assert.fail();
            }
        });
        consumerMsgSet2.forEach(m -> {
            try {
                consumer1.acknowledge(m);
            }
            catch (PulsarClientException e) {
                Assert.fail();
            }
        });
        consumer1.redeliverUnacknowledgedMessages();
        consumer2.redeliverUnacknowledgedMessages();
        Thread.sleep(1000L);
        try {
            if (consumer1.receive(100, TimeUnit.MILLISECONDS) != null || consumer2.receive(100, TimeUnit.MILLISECONDS) != null) {
                Assert.fail();
            }
        }
        finally {
            consumer1.close();
            consumer2.close();
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    private void receiveAsync(Consumer<byte[]> consumer, int totalMessage, int currentMessage, CountDownLatch latch, Set<String> consumeMsg, ExecutorService executor) throws PulsarClientException {
        if (currentMessage < totalMessage) {
            CompletableFuture future = consumer.receiveAsync();
            future.handle((msg, exception) -> {
                if (exception == null) {
                    consumeMsg.add(new String(msg.getData()));
                    try {
                        consumer.acknowledge(msg);
                    }
                    catch (PulsarClientException e1) {
                        Assert.fail((String)"message acknowledge failed", (Throwable)e1);
                    }
                    executor.execute(() -> {
                        try {
                            this.receiveAsync(consumer, totalMessage, currentMessage + 1, latch, consumeMsg, executor);
                        }
                        catch (PulsarClientException e) {
                            Assert.fail((String)"message receive failed", (Throwable)e);
                        }
                    });
                    latch.countDown();
                }
                return null;
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled")
    public void testConsumerBlockingWithUnAckedMessages(boolean ackReceiptEnabled) {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            Message msg;
            int unAckedMessagesBufferSize = 500;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 600;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(500);
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").isAckReceiptEnabled(ackReceiptEnabled).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 600; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            ArrayList messages = Lists.newArrayList();
            for (int i = 0; i < 600 && (msg = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages.size(), (int)500);
            messages.forEach(arg_0 -> ((Consumer)consumer).acknowledgeAsync(arg_0));
            int remainingMessages = 600 - messages.size();
            for (int i = 0; i < remainingMessages; ++i) {
                Message msg2 = consumer.receive(3, TimeUnit.SECONDS);
                if (msg2 == null) continue;
                messages.add(msg2);
                log.info("Received message: " + new String(msg2.getData()));
            }
            Assert.assertEquals((int)600, (int)messages.size());
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled")
    public void testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean ackReceiptEnabled) {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int unAckedMessagesBufferSize = 500;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 1500;
            int totalReceiveIteration = 3;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(500);
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).isAckReceiptEnabled(ackReceiptEnabled).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 1500; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            int totalReceivedMessages = 0;
            for (int j = 0; j < 3; ++j) {
                Message msg;
                ArrayList messages = Lists.newArrayList();
                for (int i = 0; i < 1500 && (msg = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                    messages.add(msg);
                    log.info("Received message: " + new String(msg.getData()));
                }
                Assert.assertEquals((int)messages.size(), (int)500);
                messages.forEach(m -> {
                    try {
                        consumer.acknowledge(m);
                    }
                    catch (PulsarClientException e) {
                        Assert.fail((String)"ack failed", (Throwable)e);
                    }
                });
                totalReceivedMessages += messages.size();
            }
            Assert.assertEquals((int)totalReceivedMessages, (int)1500);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled")
    public void testMultipleSharedConsumerBlockingWithUnActedMessages(boolean ackReceiptEnabled) {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int maxUnackedMessages = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            int totalReceiveMessages = 0;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").isAckReceiptEnabled(ackReceiptEnabled).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Message msg;
                Message msg2;
                Message msg3;
                int i;
                Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").isAckReceiptEnabled(ackReceiptEnabled).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
                for (int i2 = 0; i2 < 100; ++i2) {
                    String message = "my-message-" + i2;
                    producer.send((Object)message.getBytes());
                }
                ArrayList messages = Lists.newArrayList();
                for (i = 0; i < 100 && (msg3 = consumer1.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                    messages.add(msg3);
                    ++totalReceiveMessages;
                    log.info("Received message: " + new String(msg3.getData()));
                }
                Assert.assertEquals((int)messages.size(), (int)20);
                messages.clear();
                for (i = 0; i < 80 && (msg2 = consumer2.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                    messages.add(msg2);
                    ++totalReceiveMessages;
                    log.info("Received message: " + new String(msg2.getData()));
                }
                Assert.assertEquals((int)messages.size(), (int)20);
                messages.forEach(m -> {
                    try {
                        consumer2.acknowledge(m);
                    }
                    catch (PulsarClientException e) {
                        Assert.fail((String)"shouldn't have failed ", (Throwable)e);
                    }
                });
                messages.clear();
                for (i = 0; i < 60 && (msg = consumer2.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                    messages.add(msg);
                    ++totalReceiveMessages;
                    consumer2.acknowledge(msg);
                    log.info("Received message: " + new String(msg.getData()));
                }
                Assert.assertEquals((int)100, (int)totalReceiveMessages);
                producer.close();
                consumer1.close();
                consumer2.close();
                log.info("-- Exiting {} test --", (Object)this.methodName);
            }
            finally {
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
            }
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        int totalReceiveMsg = 0;
        try {
            Message msg;
            int i;
            int receiverQueueSize = 20;
            int totalProducedMsgs = 100;
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).ackTimeout(1L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            for (i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Thread.sleep(1000L);
            Assert.assertEquals((int)consumer.numMessagesInQueue(), (int)20);
            Thread.sleep(2000L);
            Assert.assertEquals((int)consumer.numMessagesInQueue(), (int)20);
            for (i = 0; i < 100 && (msg = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                consumer.acknowledge(msg);
                ++totalReceiveMsg;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMsg);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled")
    public void testUnackBlockRedeliverMessages(boolean ackReceiptEnabled) {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        int totalReceiveMsg = 0;
        try {
            Message msg;
            Message msg2;
            int unAckedMessagesBufferSize = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").isAckReceiptEnabled(ackReceiptEnabled).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            ArrayList messages = Lists.newArrayList();
            for (int i = 0; i < 100 && (msg2 = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg2);
                ++totalReceiveMsg;
                log.info("Received message: " + new String(msg2.getData()));
            }
            consumer.redeliverUnacknowledgedMessages();
            Thread.sleep(1000L);
            int alreadyConsumedMessages = messages.size();
            messages.clear();
            for (int i = 0; i < 100 && (msg = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                consumer.acknowledge(msg);
                ++totalReceiveMsg;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)(100 + alreadyConsumedMessages), (int)totalReceiveMsg);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="batchAndAckReceipt")
    public void testUnackedBlockAtBatch(int batchMessageDelayMs, boolean ackReceiptEnabled) {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            Message msg;
            Message msg2;
            int i;
            int maxUnackedMessages = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            int totalReceiveMessages = 0;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").isAckReceiptEnabled(ackReceiptEnabled).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            ProducerBuilder producerBuidler = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic");
            if (batchMessageDelayMs != 0) {
                producerBuidler.enableBatching(true);
                producerBuidler.batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS);
                producerBuidler.batchingMaxMessages(5);
            } else {
                producerBuidler.enableBatching(false);
            }
            Producer producer = producerBuidler.create();
            ArrayList futures = Lists.newArrayList();
            for (int i2 = 0; i2 < 100; ++i2) {
                String message = "my-message-" + i2;
                futures.add(producer.sendAsync((Object)message.getBytes()));
            }
            FutureUtil.waitForAll((List)futures).get();
            ArrayList messages = Lists.newArrayList();
            for (i = 0; i < 100 && (msg2 = consumer1.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg2);
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg2.getData()));
            }
            Assert.assertNotEquals((Object)messages.size(), (Object)100);
            messages.forEach(m -> {
                try {
                    consumer1.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"shouldn't have failed ", (Throwable)e);
                }
            });
            messages.clear();
            for (i = 0; i < 100 && (msg = consumer1.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                consumer1.acknowledgeAsync(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMessages);
            producer.close();
            consumer1.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockUnackConsumerAckByDifferentConsumer() {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            Message msg;
            Message msg2;
            Message msg3;
            int i;
            int maxUnackedMessages = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            int totalReceiveMessages = 0;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
            Consumer consumer1 = consumerBuilder.subscribe();
            Consumer consumer2 = consumerBuilder.subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i2 = 0; i2 < 100; ++i2) {
                String message = "my-message-" + i2;
                producer.send((Object)message.getBytes());
            }
            ArrayList messages = Lists.newArrayList();
            for (i = 0; i < 100 && (msg3 = consumer1.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg3);
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg3.getData()));
            }
            Assert.assertEquals((int)messages.size(), (int)20);
            messages.forEach(m -> {
                try {
                    consumer2.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"shouldn't have failed ", (Throwable)e);
                }
            });
            for (i = 0; i < 100 && (msg2 = consumer1.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                ++totalReceiveMessages;
                consumer2.acknowledge(msg2);
                log.info("Received message: " + new String(msg2.getData()));
            }
            for (i = 0; i < 100 && (msg = consumer2.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMessages);
            producer.close();
            consumer1.close();
            consumer2.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    @Test
    public void testEnabledChecksumClient() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1");
        int batchMessageDelayMs = 300;
        producerBuilder.enableBatching(true).batchingMaxPublishDelay(300L, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        Producer producer = producerBuilder.create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(3, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause() {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            Message msg;
            Message msg2;
            int unAckedMessagesBufferSize = 10;
            int receiverQueueSize = 20;
            int totalProducedMsgs = 20;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(10);
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 20; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
                Thread.sleep(10L);
            }
            ArrayList messages1 = Lists.newArrayList();
            for (int i = 0; i < 20 && (msg2 = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                messages1.add(msg2);
                log.info("Received message: " + new String(msg2.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)10);
            Set redeliveryMessages = messages1.stream().map(m -> (MessageIdImpl)m.getMessageId()).collect(Collectors.toSet());
            consumer.redeliverUnacknowledgedMessages((Set)Sets.newHashSet(redeliveryMessages));
            Thread.sleep(1000L);
            HashSet messages2 = Sets.newHashSet();
            for (int i = 0; i < 20 && (msg = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                messages2.add((MessageIdImpl)msg.getMessageId());
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)messages2.size());
            messages2.removeAll(redeliveryMessages);
            Assert.assertEquals((int)messages2.size(), (int)0);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            Message msg;
            Message msg2;
            int unAckedMessagesBufferSize = 10;
            int receiverQueueSize = 20;
            int totalProducedMsgs = 50;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(10);
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe();
            consumer.close();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
            for (int i = 0; i < 50; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
                Thread.sleep(10L);
            }
            consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe();
            ArrayList messages1 = Lists.newArrayList();
            for (int i = 0; i < 50 && (msg2 = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                messages1.add(msg2);
                log.info("Received message: " + new String(msg2.getData()));
            }
            Set redeliveryMessages = messages1.stream().map(m -> (MessageIdImpl)m.getMessageId()).collect(Collectors.toSet());
            consumer.redeliverUnacknowledgedMessages((Set)Sets.newHashSet(redeliveryMessages));
            Thread.sleep(1000L);
            HashSet messages2 = Sets.newHashSet();
            for (int i = 0; i < 50 && (msg = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
                messages2.add((MessageIdImpl)msg.getMessageId());
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)messages2.size());
            messages2.removeAll(redeliveryMessages);
            Assert.assertEquals((int)messages2.size(), (int)0);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPriorityConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
            PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer consumer3 = newPulsarClient1.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1).subscribe();
                PulsarClient newPulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    int i;
                    CompletableFuture future;
                    String message;
                    Consumer consumer4 = newPulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(2).subscribe();
                    Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2").create();
                    ArrayList futures = Lists.newArrayList();
                    for (int i2 = 0; i2 < 15; ++i2) {
                        message = "my-message-" + i2;
                        future = producer.sendAsync((Object)message.getBytes());
                        futures.add(future);
                    }
                    log.info("Waiting for async publish to complete");
                    for (Future future2 : futures) {
                        future2.get();
                    }
                    for (i = 0; i < 20; ++i) {
                        consumer1.receive(100, TimeUnit.MILLISECONDS);
                        consumer2.receive(100, TimeUnit.MILLISECONDS);
                    }
                    for (i = 0; i < 5; ++i) {
                        message = "my-message-" + i;
                        future = producer.sendAsync((Object)message.getBytes());
                        futures.add(future);
                    }
                    Assert.assertNull((Object)consumer4.receive(100, TimeUnit.MILLISECONDS));
                    producer.close();
                    consumer1.close();
                    consumer2.close();
                    consumer3.close();
                    consumer4.close();
                    log.info("-- Exiting {} test --", (Object)this.methodName);
                }
                finally {
                    if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                        newPulsarClient2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(newPulsarClient1).get(0) != null) {
                    newPulsarClient1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testSharedSamePriorityConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int queueSize = 5;
        int maxUnAckMsgs = this.pulsar.getConfiguration().getMaxConcurrentLookupRequest();
        this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(5);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer c1 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
            PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Message msg;
                int i;
                Consumer c2 = newPulsarClient1.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                ArrayList futures = Lists.newArrayList();
                int totalPublishMessages = 500;
                for (int i2 = 0; i2 < 500; ++i2) {
                    String message = "my-message-" + i2;
                    CompletableFuture future = producer.sendAsync((Object)message.getBytes());
                    futures.add(future);
                }
                log.info("Waiting for async publish to complete");
                for (Future future : futures) {
                    future.get();
                }
                ArrayList messages = Lists.newArrayList();
                for (i = 0; i < 500 && (msg = c1.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
                    messages.add(msg);
                }
                for (i = 0; i < 500 && (msg = c2.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
                    messages.add(msg);
                }
                Assert.assertEquals((int)10, (int)messages.size());
                PulsarClient newPulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    Consumer c3 = newPulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                    PulsarClient newPulsarClient3 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                    try {
                        Consumer c4 = newPulsarClient3.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                        PulsarClient newPulsarClient4 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                        try {
                            Message msg2;
                            int i3;
                            Consumer c5 = newPulsarClient4.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
                            for (i3 = 0; i3 < 500 && (msg2 = c4.receive(500, TimeUnit.MILLISECONDS)) != null; ++i3) {
                                messages.add(msg2);
                            }
                            for (i3 = 0; i3 < 500 && (msg2 = c5.receive(500, TimeUnit.MILLISECONDS)) != null; ++i3) {
                                messages.add(msg2);
                            }
                            for (i3 = 0; i3 < 500 && (msg2 = c3.receive(500, TimeUnit.MILLISECONDS)) != null; ++i3) {
                                messages.add(msg2);
                                c3.acknowledge(msg2);
                            }
                            Assert.assertEquals((int)messages.size(), (int)500);
                            producer.close();
                            c1.close();
                            c2.close();
                            c3.close();
                            c4.close();
                            c5.close();
                            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
                            log.info("-- Exiting {} test --", (Object)this.methodName);
                        }
                        finally {
                            if (Collections.singletonList(newPulsarClient4).get(0) != null) {
                                newPulsarClient4.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(newPulsarClient3).get(0) != null) {
                            newPulsarClient3.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                        newPulsarClient2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(newPulsarClient1).get(0) != null) {
                    newPulsarClient1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    @Test(dataProvider="ackReceiptEnabled", groups={"quarantine"})
    public void testRedeliveryFailOverConsumer(boolean ackReceiptEnabled) throws Exception {
        Message msg;
        Message msg2;
        Message msg3;
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        int receiverQueueSize = 10;
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Failover).isAckReceiptEnabled(ackReceiptEnabled).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic").create();
        int consumeMsgInParts = 4;
        for (int i2 = 0; i2 < 10; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
            Thread.sleep(10L);
        }
        ArrayList messages1 = Lists.newArrayList();
        for (i = 0; i < consumeMsgInParts && (msg3 = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
            messages1.add(msg3);
            consumer.acknowledge(msg3);
            log.info("Received message: " + new String(msg3.getData()));
        }
        Assert.assertEquals((int)messages1.size(), (int)consumeMsgInParts);
        consumer.redeliverUnacknowledgedMessages();
        Thread.sleep(1000L);
        messages1.clear();
        for (i = 0; i < consumeMsgInParts && (msg2 = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
            messages1.add(msg2);
            consumer.acknowledge(msg2);
            log.info("Received message: " + new String(msg2.getData()));
        }
        Assert.assertEquals((int)messages1.size(), (int)consumeMsgInParts);
        consumer.redeliverUnacknowledgedMessages();
        Thread.sleep(1000L);
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Thread.sleep(100L);
        }
        int remainingMsgs = 20 - 2 * consumeMsgInParts;
        messages1.clear();
        for (int i3 = 0; i3 < remainingMsgs && (msg = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i3) {
            messages1.add(msg);
            consumer.acknowledge(msg);
            log.info("Received message: " + new String(msg.getData()));
        }
        Assert.assertEquals((int)messages1.size(), (int)remainingMsgs);
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=10000L)
    public void testFailReceiveAsyncOnConsumerClose() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/failAsyncReceive-1"}).subscriptionName("my-subscriber-name").subscribe();
        consumer.close();
        try {
            consumer.receiveAsync().get(1L, TimeUnit.SECONDS);
            Assert.fail((String)"it should have failed because consumer is already closed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.AlreadyClosedException));
        }
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)"persistent://my-property/my-ns/failAsyncReceive-2");
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Consumer partitionedConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        partitionedConsumer.close();
        try {
            partitionedConsumer.receiveAsync().get(1L, TimeUnit.SECONDS);
            Assert.fail((String)"it should have failed because consumer is already closed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.AlreadyClosedException));
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/myecdsa-topic1-" + System.currentTimeMillis();
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        class EncKeyReader
        implements CryptoKeyReader {
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Consumer cryptoConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscribe();
        Consumer normalConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-subscriber-name-normal").subscribe();
        Producer cryptoProducer = this.pulsarClient.newProducer().topic(topicName).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            cryptoProducer.send((Object)message.getBytes());
        }
        Message msg = normalConsumer.receive(500, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)msg);
        for (int i = 0; i < 10; ++i) {
            msg = cryptoConsumer.receive(3, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        cryptoConsumer.acknowledgeCumulative(msg);
        cryptoConsumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testRSAEncryption() throws Exception {
        String message;
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis();
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        class EncKeyReader
        implements CryptoKeyReader {
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/myrsa-topic1"}).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscribe();
        Consumer normalConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-subscriber-name-normal").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 10; i < 20; ++i) {
            message = "my-message-" + i;
            producer2.send((Object)message.getBytes());
        }
        MessageImpl msg = (MessageImpl)normalConsumer.receive(500, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)msg);
        for (int i2 = 0; i2 < 20; ++i2) {
            msg = (MessageImpl)consumer.receive(3, TimeUnit.SECONDS);
            msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i2;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative((Message)msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testDefaultCryptoKeyReader() throws Exception {
        int i;
        int i2;
        String topic = "persistent://my-property/my-ns/default-crypto-key-reader" + System.currentTimeMillis();
        String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
        String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
        String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K";
        String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K";
        String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem";
        String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem";
        String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
        String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";
        int numMsg = 10;
        HashMap privateKeyFileMap = Maps.newHashMap();
        privateKeyFileMap.put("client-ecdsa.pem", "file:./src/test/resources/certificate/private-key.client-ecdsa.pem");
        privateKeyFileMap.put("client-rsa.pem", "file:./src/test/resources/certificate/private-key.client-rsa.pem");
        HashMap privateKeyDataMap = Maps.newHashMap();
        privateKeyDataMap.put("client-ecdsa.pem", "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K");
        privateKeyDataMap.put("client-rsa.pem", "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==");
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").defaultCryptoKeyReader("file:./src/test/resources/certificate/private-key.client-ecdsa.pem").subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub2").defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K").subscribe();
        Consumer consumer3 = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub3").defaultCryptoKeyReader((Map)privateKeyFileMap).subscribe();
        Consumer consumer4 = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub4").defaultCryptoKeyReader((Map)privateKeyDataMap).subscribe();
        Producer producer1 = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem").defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-ecdsa.pem").create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem").defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K").create();
        for (i2 = 0; i2 < 10; ++i2) {
            producer1.send((Object)("my-message-" + i2).getBytes());
        }
        for (i2 = 10; i2 < 20; ++i2) {
            producer2.send((Object)("my-message-" + i2).getBytes());
        }
        producer1.close();
        producer2.close();
        for (Consumer consumer : Lists.newArrayList((Object[])new Consumer[]{consumer1, consumer2})) {
            MessageImpl msg = null;
            for (int i3 = 0; i3 < 20; ++i3) {
                msg = (MessageImpl)consumer.receive(3, TimeUnit.SECONDS);
                msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
                Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i3));
            }
            consumer.acknowledgeCumulative(msg);
        }
        consumer1.unsubscribe();
        consumer2.unsubscribe();
        Producer producer3 = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem").defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem").create();
        Producer producer4 = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem").defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==").create();
        for (i = 20; i < 30; ++i) {
            producer3.send((Object)("my-message-" + i).getBytes());
        }
        for (i = 30; i < 40; ++i) {
            producer4.send((Object)("my-message-" + i).getBytes());
        }
        producer3.close();
        producer4.close();
        for (Consumer consumer : Lists.newArrayList((Object[])new Consumer[]{consumer3, consumer4})) {
            MessageImpl msg = null;
            for (int i4 = 0; i4 < 40; ++i4) {
                msg = (MessageImpl)consumer.receive(3, TimeUnit.SECONDS);
                msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
                Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i4));
            }
            consumer.acknowledgeCumulative(msg);
        }
        consumer3.unsubscribe();
        consumer4.unsubscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"quarantine"})
    public void testRedeliveryOfFailedMessages() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String encryptionKeyName = "client-rsa.pem";
        String encryptionKeyVersion = "1.0";
        final HashMap metadata = Maps.newHashMap();
        metadata.put("version", "1.0");
        String topicName = "persistent://my-property/my-ns/myrsa-topic1";
        class EncKeyReader
        implements CryptoKeyReader {
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Producer producer = this.pulsarClient.newProducer().topic(topicName).addEncryptionKey("client-rsa.pem").compressionType(CompressionType.LZ4).enableBatching(false).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
            PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                class InvalidKeyReader
                implements CryptoKeyReader {
                    InvalidKeyReader() {
                    }

                    public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                        return null;
                    }

                    public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
                        return null;
                    }
                }
                Consumer consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new InvalidKeyReader()).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
                PulsarClient newPulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    int i;
                    Consumer consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
                    int numberOfMessages = 100;
                    String message = "my-message";
                    HashSet<String> messages = new HashSet<String>();
                    for (int i2 = 0; i2 < numberOfMessages; ++i2) {
                        producer.send((Object)(message + i2).getBytes());
                    }
                    Message m = consumer2.receive(3, TimeUnit.SECONDS);
                    Assert.assertNull((Object)m);
                    m = consumer3.receive(3, TimeUnit.SECONDS);
                    Assert.assertNull((Object)m);
                    Thread.sleep(1000L);
                    for (i = 0; i < numberOfMessages; ++i) {
                        m = consumer1.receive(3, TimeUnit.SECONDS);
                        Assert.assertNotNull((Object)m, (String)("reading message index #" + i + " failed"));
                        messages.add(new String(m.getData()));
                        consumer1.acknowledge(m);
                    }
                    m = consumer2.receive(3, TimeUnit.SECONDS);
                    Assert.assertNull((Object)m);
                    m = consumer3.receive(3, TimeUnit.SECONDS);
                    Assert.assertNull((Object)m);
                    for (i = 0; i < numberOfMessages; ++i) {
                        Assert.assertTrue((boolean)messages.contains(message + i));
                    }
                    consumer1.close();
                    consumer2.close();
                    consumer3.close();
                    log.info("-- Exiting {} test --", (Object)this.methodName);
                }
                finally {
                    if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                        newPulsarClient2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(newPulsarClient1).get(0) != null) {
                    newPulsarClient1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    @Test
    public void testEncryptionFailure() throws Exception {
        class EncKeyReader
        implements CryptoKeyReader {
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        log.error("Failed to read certificate from {}", (Object)CERT_FILE_PATH);
                    }
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        log.error("Failed to read certificate from {}", (Object)CERT_FILE_PATH);
                    }
                }
                return null;
            }
        }
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        try {
            this.pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
            Assert.fail((String)"Producer creation should not suceed if failing to read key");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        MessageImpl msg = (MessageImpl)consumer.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg, (String)"Receive should have failed with no keyreader");
        consumer.close();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        int msgNum = 0;
        try {
            msg = (MessageImpl)consumer.receive(3, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            String expectedMessage = "my-message-" + msgNum++;
            Assert.assertNotEquals((Object)receivedMessage, (Object)expectedMessage, (String)("Received encrypted message " + receivedMessage + " should not match the expected message " + expectedMessage));
            consumer.acknowledgeCumulative((Message)msg);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Failed to receive message even aftet ConsumerCryptoFailureAction.CONSUME is set.");
        }
        consumer.close();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        for (int i = msgNum; i < 9; ++i) {
            msg = (MessageImpl)consumer.receive(3, TimeUnit.SECONDS);
            msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative((Message)msg);
        consumer.close();
        consumer.close();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        msg = (MessageImpl)consumer.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg, (String)"Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String encryptionKeyName = "client-rsa.pem";
        String encryptionKeyVersion = "1.0";
        final HashMap metadata = Maps.newHashMap();
        metadata.put("version", "1.0");
        class EncKeyReader
        implements CryptoKeyReader {
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").compressionType(CompressionType.LZ4).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Consumer consumer = this.pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1").subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).subscribe();
        String message = "my-message";
        producer.send((Object)message.getBytes());
        TopicMessageImpl msg = (TopicMessageImpl)consumer.receive(3, TimeUnit.SECONDS);
        String receivedMessage = this.decryptMessage((TopicMessageImpl<byte[]>)msg, "client-rsa.pem", new EncKeyReader());
        Assert.assertEquals((String)message, (String)receivedMessage);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    private String decryptMessage(TopicMessageImpl<byte[]> msg, String encryptionKeyName, CryptoKeyReader reader) throws Exception {
        Optional ctx = msg.getEncryptionCtx();
        Assert.assertTrue((boolean)ctx.isPresent());
        EncryptionContext encryptionCtx = (EncryptionContext)ctx.orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
        Map keys = encryptionCtx.getKeys();
        Assert.assertEquals((int)keys.size(), (int)1);
        EncryptionContext.EncryptionKey encryptionKey = (EncryptionContext.EncryptionKey)keys.get(encryptionKeyName);
        byte[] dataKey = encryptionKey.getKeyValue();
        Map metadata = encryptionKey.getMetadata();
        String version = (String)metadata.get("version");
        Assert.assertEquals((String)version, (String)"1.0");
        CompressionType compressionType = encryptionCtx.getCompressionType();
        int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
        byte[] encrParam = encryptionCtx.getParam();
        String encAlgo = encryptionCtx.getAlgorithm();
        int batchSize = encryptionCtx.getBatchSize().orElse(0);
        ByteBuffer payloadBuf = ByteBuffer.wrap(msg.getData());
        MessageCryptoBc crypto = new MessageCryptoBc("test", false);
        MessageMetadata messageMetadata = new MessageMetadata().setEncryptionParam(encrParam).setProducerName("test").setSequenceId(123L).setPublishTime(12333453454L).setCompression(CompressionCodecProvider.convertToWireProtocol((CompressionType)compressionType)).setUncompressedSize(uncompressedSize);
        messageMetadata.addEncryptionKey().setKey(encryptionKeyName).setValue(dataKey);
        if (encAlgo != null) {
            messageMetadata.setEncryptionAlgo(encAlgo);
        }
        ByteBuffer decryptedPayload = ByteBuffer.allocate(crypto.getMaxOutputSize(payloadBuf.remaining()));
        crypto.decrypt(() -> messageMetadata, payloadBuf, decryptedPayload, reader);
        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec((CompressionType)compressionType);
        ByteBuf uncompressedPayload = codec.decode(Unpooled.wrappedBuffer((ByteBuffer)decryptedPayload), uncompressedSize);
        if (batchSize > 0) {
            SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
            uncompressedPayload = Commands.deSerializeSingleMessageInBatch((ByteBuf)uncompressedPayload, (SingleMessageMetadata)singleMessageMetadata, (int)0, (int)batchSize);
        }
        byte[] data = new byte[uncompressedPayload.readableBytes()];
        uncompressedPayload.readBytes(data);
        uncompressedPayload.release();
        return new String(data);
    }

    @Test
    public void testConsumerSubscriptionInitialize() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/test-subscription-initialize-topic";
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        for (int i = 0; i < 5; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Consumer defaultConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe();
        Consumer latestConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionName("test-subscription-latest").subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe();
        Consumer earliestConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        for (int i = 5; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Assert.assertEquals((byte[])defaultConsumer.receive(3, TimeUnit.SECONDS).getData(), (byte[])"my-message-5".getBytes());
        Assert.assertEquals((byte[])latestConsumer.receive(3, TimeUnit.SECONDS).getData(), (byte[])"my-message-5".getBytes());
        Assert.assertEquals((byte[])earliestConsumer.receive(3, TimeUnit.SECONDS).getData(), (byte[])"my-message-0".getBytes());
        defaultConsumer.close();
        latestConsumer.close();
        earliestConsumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testMultiTopicsConsumerImplPauseForPartitionNumberChange() throws Exception {
        int i;
        int counter;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/partition-topic";
        this.admin.topics().createPartitionedTopic(topicName, 1);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).autoUpdatePartitionsInterval(2, TimeUnit.SECONDS).create();
        for (int i2 = 0; i2 < 5; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes(StandardCharsets.UTF_8));
        }
        boolean receiverQueueSize = true;
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).receiverQueueSize(1).autoUpdatePartitionsInterval(2, TimeUnit.SECONDS).subscriptionName("test-multi-topic-consumer").subscribe();
        for (counter = 0; counter < 4; ++counter) {
            Assert.assertEquals((byte[])consumer.receive(3, TimeUnit.SECONDS).getData(), (byte[])("my-message-" + counter).getBytes());
        }
        consumer.pause();
        this.admin.topics().updatePartitionedTopic(topicName, 3);
        while (((MultiTopicsConsumerImpl)consumer).getConsumers().size() <= 1) {
            Thread.sleep(1L);
        }
        for (i = 5; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < 1; ++i) {
            Assert.assertEquals((byte[])consumer.receive(3, TimeUnit.SECONDS).getData(), (byte[])("my-message-" + counter++).getBytes());
        }
        Assert.assertNull((Object)consumer.receive(3, TimeUnit.SECONDS));
        consumer.resume();
        while (consumer.receive(3, TimeUnit.SECONDS) != null) {
            ++counter;
        }
        Assert.assertEquals((int)counter, (int)10);
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testMultiTopicsConsumerImplPauseForManualSubscription() throws Exception {
        int i;
        int counter;
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicNameBase = "persistent://my-property/my-ns/my-topic-";
        Producer producer1 = this.pulsarClient.newProducer().topic(topicNameBase + "1").enableBatching(false).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicNameBase + "2").enableBatching(false).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicNameBase + "3").enableBatching(false).create();
        for (int i2 = 0; i2 < 5; ++i2) {
            String message = "my-message-" + i2;
            producer1.send((Object)message.getBytes(StandardCharsets.UTF_8));
            producer2.send((Object)message.getBytes(StandardCharsets.UTF_8));
            producer3.send((Object)message.getBytes(StandardCharsets.UTF_8));
        }
        int receiverQueueSize = 1;
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)Lists.newArrayList((Object[])new String[]{topicNameBase + "1", topicNameBase + "2"})).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).receiverQueueSize(receiverQueueSize).subscriptionName("test-multi-topic-consumer").subscribe();
        for (counter = 0; counter < 2 * (5 - receiverQueueSize); ++counter) {
            Assertions.assertThat((String)new String(consumer.receive(3, TimeUnit.SECONDS).getData(), StandardCharsets.UTF_8)).startsWith((CharSequence)"my-message-");
        }
        consumer.pause();
        ((MultiTopicsConsumerImpl)consumer).subscribeAsync(topicNameBase + "3", true).join();
        for (i = 5; i < 10; ++i) {
            String message = "my-message-" + i;
            producer1.send((Object)message.getBytes(StandardCharsets.UTF_8));
            producer2.send((Object)message.getBytes(StandardCharsets.UTF_8));
            producer3.send((Object)message.getBytes(StandardCharsets.UTF_8));
        }
        for (i = 0; i < 2 * receiverQueueSize; ++i) {
            Assertions.assertThat((String)new String(consumer.receive(3, TimeUnit.SECONDS).getData(), StandardCharsets.UTF_8)).startsWith((CharSequence)"my-message-");
            ++counter;
        }
        Assert.assertNull((Object)consumer.receive(3, TimeUnit.SECONDS));
        consumer.resume();
        while (consumer.receive(3, TimeUnit.SECONDS) != null) {
            ++counter;
        }
        Assert.assertEquals((int)counter, (int)30);
        producer1.close();
        producer2.close();
        producer3.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testFlushBatchEnabled() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-flush-enabled"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-flush-enabled").enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.HOURS).batchingMaxMessages(10000);
        try (Producer producer = producerBuilder.create();){
            for (int i = 0; i < 10; ++i) {
                String message = "my-message-" + i;
                producer.sendAsync((Object)message.getBytes());
            }
            producer.flush();
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(3, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testFlushBatchDisabled() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-flush-disabled"}).startMessageIdInclusive().subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-flush-disabled").enableBatching(false);
        try (Producer producer = producerBuilder.create();){
            for (int i = 0; i < 10; ++i) {
                String message = "my-message-" + i;
                producer.sendAsync((Object)message.getBytes());
            }
            producer.flush();
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(3, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testReachedEndOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/testReachedEndOfTopic";
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
        producer.close();
        this.admin.topics().terminateTopicAsync(topicName).get();
        final CountDownLatch latch = new CountDownLatch(2);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-subscriber-name").messageListener(new MessageListener(){

            public void reachedEndOfTopic(Consumer consumer) {
                log.info("called reachedEndOfTopic  {}", (Object)SimpleProducerConsumerTest.this.methodName);
                latch.countDown();
            }

            public void received(Consumer consumer, Message message) {
            }
        }).subscribe();
        Assert.assertFalse((boolean)latch.await(1L, TimeUnit.SECONDS));
        Assert.assertEquals((long)latch.getCount(), (long)1L);
        consumer.close();
    }

    @Test
    public void testFailOverConsumerPriority() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/priority-topic";
        String subscriptionName = "my-sub";
        int noOfPartitions = 9;
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/priority-topic", 9);
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/priority-topic"}).subscriptionName("my-sub").consumerName("aaa").subscriptionType(SubscriptionType.Failover).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).priorityLevel(1).subscribe();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/priority-topic"}).subscriptionName("my-sub").consumerName("bbb1").subscriptionType(SubscriptionType.Failover).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).priorityLevel(1);
        Consumer consumer2 = consumerBuilder.subscribe();
        AtomicInteger consumer1Count = new AtomicInteger(0);
        this.admin.topics().getPartitionedStats("persistent://my-property/my-ns/priority-topic", true).getPartitions().forEach((p, stats) -> {
            String activeConsumerName = ((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getActiveConsumerName();
            if (activeConsumerName.equals("aaa")) {
                consumer1Count.incrementAndGet();
            }
        });
        Assert.assertNotEquals((Object)consumer1Count, (Object)9);
        consumer2.close();
        consumer2 = consumerBuilder.priorityLevel(0).subscribe();
        Consumer consumer3 = consumerBuilder.consumerName("bbb2").priorityLevel(0).subscribe();
        Consumer consumer4 = consumerBuilder.consumerName("bbb3").priorityLevel(0).subscribe();
        Consumer consumer5 = consumerBuilder.consumerName("bbb4").priorityLevel(1).subscribe();
        Integer evenDistributionCount = 3;
        SimpleProducerConsumerTest.retryStrategically(test -> {
            try {
                HashMap subsCount = Maps.newHashMap();
                this.admin.topics().getPartitionedStats("persistent://my-property/my-ns/priority-topic", true).getPartitions().forEach((p, stats) -> {
                    String activeConsumerName = ((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getActiveConsumerName();
                    subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
                });
                return subsCount.size() == 3 && ((Integer)subsCount.get("bbb1")).equals(evenDistributionCount) && ((Integer)subsCount.get("bbb2")).equals(evenDistributionCount) && ((Integer)subsCount.get("bbb3")).equals(evenDistributionCount);
            }
            catch (PulsarAdminException pulsarAdminException) {
                return false;
            }
        }, 5, 100L);
        HashMap subsCount = Maps.newHashMap();
        this.admin.topics().getPartitionedStats("persistent://my-property/my-ns/priority-topic", true).getPartitions().forEach((p, stats) -> {
            String activeConsumerName = ((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getActiveConsumerName();
            subsCount.compute(activeConsumerName, (k, v) -> v != null ? v + 1 : 1);
        });
        Assert.assertEquals((int)subsCount.size(), (int)3);
        Assert.assertEquals(subsCount.get("bbb1"), (Object)evenDistributionCount);
        Assert.assertEquals(subsCount.get("bbb2"), (Object)evenDistributionCount);
        Assert.assertEquals(subsCount.get("bbb3"), (Object)evenDistributionCount);
        consumer1.close();
        consumer2.close();
        consumer3.close();
        consumer4.close();
        consumer5.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartitionedTopicWithOnePartition() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/one-partitioned-topic";
        String subscriptionName = "my-sub-";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/one-partitioned-topic", 1);
        Assert.assertEquals((int)this.admin.topics().getPartitionedTopicMetadata((String)"persistent://my-property/my-ns/one-partitioned-topic").partitions, (int)1);
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/one-partitioned-topic"}).subscriptionName("my-sub-1").consumerName("aaa").subscribe();
        try {
            log.info("Consumer1 created. topic: {}", (Object)consumer1.getTopic());
            Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/one-partitioned-topic-partition-0"}).subscriptionName("my-sub-2").consumerName("bbb").subscribe();
            try {
                log.info("Consumer2 created. topic: {}", (Object)consumer2.getTopic());
                Producer producer1 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/one-partitioned-topic").enableBatching(false).create();
                try {
                    log.info("Producer1 created. topic: {}", (Object)producer1.getTopic());
                    Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/one-partitioned-topic-partition-0").enableBatching(false).create();
                    try {
                        int i;
                        log.info("Producer2 created. topic: {}", (Object)producer2.getTopic());
                        int numMessages = 10;
                        for (i = 0; i < 10; ++i) {
                            producer1.newMessage().value((Object)("one-partitioned-topic-value-producer1-" + i).getBytes(StandardCharsets.UTF_8)).send();
                            producer2.newMessage().value((Object)("one-partitioned-topic-value-producer2-" + i).getBytes(StandardCharsets.UTF_8)).send();
                        }
                        for (i = 0; i < 20; ++i) {
                            Message msg = consumer1.receive(500, TimeUnit.MILLISECONDS);
                            Assert.assertNotNull((Object)msg);
                            log.info("Consumer1 Received message '{}'.", (Object)new String((byte[])msg.getValue(), StandardCharsets.UTF_8));
                            msg = consumer2.receive(500, TimeUnit.MILLISECONDS);
                            Assert.assertNotNull((Object)msg);
                            log.info("Consumer2 Received message '{}'.", (Object)new String((byte[])msg.getValue(), StandardCharsets.UTF_8));
                        }
                        Assert.assertNull((Object)consumer1.receive(500, TimeUnit.MILLISECONDS));
                        Assert.assertNull((Object)consumer2.receive(500, TimeUnit.MILLISECONDS));
                        log.info("-- Exiting {} test --", (Object)this.methodName);
                    }
                    finally {
                        if (Collections.singletonList(producer2).get(0) != null) {
                            producer2.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(producer1).get(0) != null) {
                        producer1.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer2).get(0) != null) {
                    consumer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer1).get(0) != null) {
                consumer1.close();
            }
        }
    }

    @Test(dataProvider="variationsForExpectedPos")
    public void testConsumerStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) throws Exception {
        String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos";
        int resetIndex = new Random().nextInt(numOfMessages);
        int firstMessage = startInclusive ? resetIndex : resetIndex + 1;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos").enableBatching(batching).create();
        CountDownLatch latch = new CountDownLatch(numOfMessages);
        AtomicReference resetPos = new AtomicReference();
        for (int i = 0; i < numOfMessages; ++i) {
            int j = i;
            ((CompletableFuture)producer.sendAsync((Object)String.format("msg num %d", i).getBytes()).thenCompose(messageId -> FutureUtils.value((Object)Pair.of((Object)j, (Object)messageId)))).whenComplete((p, e) -> {
                if (e != null) {
                    Assert.fail((String)("send msg failed due to " + e.getMessage()));
                } else {
                    log.info("send msg with id {}", p.getRight());
                    if ((Integer)p.getLeft() == resetIndex) {
                        resetPos.set((MessageId)p.getRight());
                    }
                }
                latch.countDown();
            });
        }
        latch.await();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos"});
        if (startInclusive) {
            consumerBuilder.startMessageIdInclusive();
        }
        Consumer consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe();
        consumer.seek((MessageId)resetPos.get());
        log.info("reset cursor to {}", resetPos.get());
        HashSet messageSet = Sets.newHashSet();
        for (int i = firstMessage; i < numOfMessages; ++i) {
            Message message = consumer.receive(3, TimeUnit.SECONDS);
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertEquals((int)((ConsumerImpl)consumer).numMessagesInQueue(), (int)0);
        Assert.assertEquals((int)messageSet.size(), (int)(numOfMessages - firstMessage));
        consumer.close();
        producer.close();
    }

    @Test
    public void testReleaseSemaphoreOnFailMessages() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int maxPendingMessages = 10;
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().enableBatching(false).blockIfQueueFull(true).maxPendingMessages(maxPendingMessages).topic("persistent://my-property/my-ns/my-topic2");
        Producer producer = producerBuilder.create();
        ArrayList futures = Lists.newArrayList();
        byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
        for (int i = 0; i < maxPendingMessages + 10; ++i) {
            CompletableFuture future = producer.sendAsync((Object)message);
            try {
                future.get();
                Assert.fail((String)"should fail with InvalidMessageException");
                continue;
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.InvalidMessageException));
            }
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=10000L)
    public void testReceiveAsyncCompletedWhenClosing() throws Exception {
        String topic = "persistent://my-property/my-ns/testCompletedWhenClosing";
        String partitionedTopic = "persistent://my-property/my-ns/testCompletedWhenClosing-partitioned";
        String errorMsg = "cleaning and closing the consumers";
        BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(10240).maxNumMessages(10).timeout(-1, TimeUnit.SECONDS).build();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testCompletedWhenClosing"}).subscriptionName("my-subscriber-name").batchReceivePolicy(batchReceivePolicy).subscribe();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            try {
                new Thread(() -> {
                    try {
                        consumer.close();
                    }
                    catch (PulsarClientException pulsarClientException) {
                        // empty catch block
                    }
                }).start();
                consumer.receiveAsync().get();
                Assert.fail((String)"should be interrupted");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)e.getMessage().contains("cleaning and closing the consumers"));
                countDownLatch.countDown();
            }
        }).start();
        countDownLatch.await();
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testCompletedWhenClosing"}).subscriptionName("my-subscriber-name").batchReceivePolicy(batchReceivePolicy).subscribe();
        new Thread(() -> {
            try {
                new Thread(() -> {
                    try {
                        consumer2.close();
                    }
                    catch (PulsarClientException pulsarClientException) {
                        // empty catch block
                    }
                }).start();
                consumer2.batchReceiveAsync().get();
                Assert.fail((String)"should be interrupted");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)e.getMessage().contains("cleaning and closing the consumers"));
                countDownLatch2.countDown();
            }
        }).start();
        countDownLatch2.await();
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testCompletedWhenClosing-partitioned", 3);
        Consumer partitionedTopicConsumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testCompletedWhenClosing-partitioned"}).subscriptionName("my-subscriber-name-partitionedTopic").batchReceivePolicy(batchReceivePolicy).subscribe();
        new Thread(() -> {
            try {
                new Thread(() -> {
                    try {
                        partitionedTopicConsumer.close();
                    }
                    catch (PulsarClientException pulsarClientException) {
                        // empty catch block
                    }
                }).start();
                partitionedTopicConsumer.batchReceiveAsync().get();
                Assert.fail((String)"should be interrupted");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)e.getMessage().contains("cleaning and closing the consumers"));
                countDownLatch3.countDown();
            }
        }).start();
        countDownLatch3.await();
    }

    @Test(timeOut=20000L)
    public void testResetPosition() throws Exception {
        String topicName = "persistent://my-property/my-ns/testResetPosition";
        String subName = "my-sub";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic("persistent://my-property/my-ns/testResetPosition").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testResetPosition"}).subscriptionName("my-sub").subscribe();
        for (int i = 0; i < 50; ++i) {
            producer.send((Object)("msg" + i));
        }
        Message lastMsg = null;
        for (int i = 0; i < 10; ++i) {
            lastMsg = consumer.receive(3, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)lastMsg);
            consumer.acknowledge(lastMsg);
        }
        MessageIdImpl lastMessageId = (MessageIdImpl)lastMsg.getMessageId();
        consumer.close();
        producer.close();
        this.admin.topics().resetCursor("persistent://my-property/my-ns/testResetPosition", "my-sub", lastMsg.getMessageId());
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testResetPosition"}).subscriptionName("my-sub").subscribe();
        Message message = consumer2.receive(3, TimeUnit.SECONDS);
        Assert.assertEquals((Object)message.getMessageId(), (Object)lastMsg.getMessageId());
        consumer2.close();
        this.admin.topics().resetCursor("persistent://my-property/my-ns/testResetPosition", "my-sub", lastMsg.getMessageId(), true);
        Consumer consumer3 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testResetPosition"}).subscriptionName("my-sub").subscribe();
        message = consumer3.receive(3, TimeUnit.SECONDS);
        Assert.assertNotEquals((Object)message.getMessageId(), (Object)lastMsg.getMessageId());
        MessageIdImpl messageId = (MessageIdImpl)message.getMessageId();
        Assert.assertEquals((long)(messageId.getEntryId() - 1L), (long)lastMessageId.getEntryId());
        consumer3.close();
        this.admin.topics().resetCursorAsync("persistent://my-property/my-ns/testResetPosition", "my-sub", lastMsg.getMessageId(), true).get(3L, TimeUnit.SECONDS);
        Consumer consumer4 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testResetPosition"}).subscriptionName("my-sub").subscribe();
        message = consumer4.receive(3, TimeUnit.SECONDS);
        Assert.assertNotEquals((Object)message.getMessageId(), (Object)lastMsg.getMessageId());
        messageId = (MessageIdImpl)message.getMessageId();
        Assert.assertEquals((long)(messageId.getEntryId() - 1L), (long)lastMessageId.getEntryId());
        consumer4.close();
        this.admin.topics().resetCursorAsync("persistent://my-property/my-ns/testResetPosition", "my-sub", lastMsg.getMessageId()).get(3L, TimeUnit.SECONDS);
        Consumer consumer5 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testResetPosition"}).subscriptionName("my-sub").subscribe();
        message = consumer5.receive(3, TimeUnit.SECONDS);
        Assert.assertEquals((Object)message.getMessageId(), (Object)lastMsg.getMessageId());
        consumer5.close();
    }

    @Test
    public void testGetLastDisconnectedTimestamp() throws Exception {
        String topicName = "persistent://my-property/my-ns/testGetLastDisconnectedTimestamp";
        String subName = "my-sub";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic("persistent://my-property/my-ns/testGetLastDisconnectedTimestamp").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testGetLastDisconnectedTimestamp"}).subscriptionName("my-sub").subscribe();
        Assert.assertEquals((long)producer.getLastDisconnectedTimestamp(), (long)0L);
        Assert.assertEquals((long)consumer.getLastDisconnectedTimestamp(), (long)0L);
        this.pulsar.close();
        Assert.assertTrue((producer.getLastDisconnectedTimestamp() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((consumer.getLastDisconnectedTimestamp() > 0L ? 1 : 0) != 0);
    }

    @Test
    public void testGetLastDisconnectedTimestampForPartitionedTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/testGetLastDisconnectedTimestampForPartitionedTopic";
        String subName = "my-sub";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testGetLastDisconnectedTimestampForPartitionedTopic", 3);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic("persistent://my-property/my-ns/testGetLastDisconnectedTimestampForPartitionedTopic").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testGetLastDisconnectedTimestampForPartitionedTopic"}).subscriptionName("my-sub").subscribe();
        Assert.assertEquals((long)producer.getLastDisconnectedTimestamp(), (long)0L);
        Assert.assertEquals((long)consumer.getLastDisconnectedTimestamp(), (long)0L);
        this.pulsar.close();
        Assert.assertTrue((producer.getLastDisconnectedTimestamp() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((consumer.getLastDisconnectedTimestamp() > 0L ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetStats() throws Exception {
        String topicName = "persistent://my-property/my-ns/testGetStats" + UUID.randomUUID();
        String subName = "my-sub";
        int receiveQueueSize = 100;
        PulsarClient client = this.newPulsarClient(this.lookupUrl.toString(), 100);
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topicName).create();
            ConsumerImpl consumer = (ConsumerImpl)client.newConsumer(Schema.STRING).topic(new String[]{topicName}).receiverQueueSize(100).subscriptionName("my-sub").subscribe();
            Assert.assertNull((Object)consumer.getStats().getMsgNumInSubReceiverQueue());
            Assert.assertEquals((int)consumer.getStats().getMsgNumInReceiverQueue(), (int)0);
            for (int i = 0; i < 100; ++i) {
                producer.sendAsync((Object)("msg" + i));
            }
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumer.getStats().getMsgNumInReceiverQueue(), (int)100));
            consumer.close();
            producer.close();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetStatsForPartitionedTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/testGetStatsForPartitionedTopic";
        String subName = "my-sub";
        int receiveQueueSize = 100;
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testGetStatsForPartitionedTopic", 3);
        PulsarClient client = this.newPulsarClient(this.lookupUrl.toString(), 100);
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic("persistent://my-property/my-ns/testGetStatsForPartitionedTopic").create();
            MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl)client.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testGetStatsForPartitionedTopic"}).receiverQueueSize(100).subscriptionName("my-sub").subscribe();
            Assert.assertEquals((int)consumer.getStats().getMsgNumInSubReceiverQueue().size(), (int)3);
            Assert.assertEquals((int)consumer.getStats().getMsgNumInReceiverQueue(), (int)0);
            consumer.getStats().getMsgNumInSubReceiverQueue().forEach((key, value) -> Assert.assertEquals((int)value, (int)0));
            for (int i = 0; i < 100; ++i) {
                producer.sendAsync((Object)("msg" + i));
            }
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumer.getStats().getMsgNumInReceiverQueue(), (int)100));
            consumer.close();
            producer.close();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @DataProvider(name="partitioned")
    public static Object[] isPartitioned() {
        return new Object[]{false, true};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="partitioned")
    public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
        String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" + UUID.randomUUID().toString();
        String subName = "my-sub";
        if (isPartitioned) {
            this.admin.topics().createPartitionedTopic(topicName, 3);
        }
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-sub").subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
            try {
                int i;
                int messages = 100;
                ArrayList<CompletableFuture> messageIds = new ArrayList<CompletableFuture>(100);
                for (i = 0; i < 100; ++i) {
                    messageIds.add(producer.newMessage().key(i + "").value((Object)("Message-" + i).getBytes()).sendAsync());
                }
                FutureUtil.waitForAll(messageIds).get();
                Awaitility.await().untilAsserted(() -> {
                    long size = ((ConsumerBase)consumer).getIncomingMessageSize();
                    log.info("Check the incoming message size should greater that 0, current size is {}", (Object)size);
                    Assert.assertTrue((size > 0L ? 1 : 0) != 0);
                });
                for (i = 0; i < 100; ++i) {
                    consumer.acknowledge(consumer.receive(3, TimeUnit.SECONDS));
                }
                Awaitility.await().untilAsserted(() -> {
                    long size = ((ConsumerBase)consumer).getIncomingMessageSize();
                    log.info("Check the incoming message size should be 0, current size is {}", (Object)size);
                    Assert.assertEquals((long)size, (long)0L);
                });
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    @DataProvider(name="enableBatching")
    public static Object[] isEnableBatching() {
        return new Object[]{false, true};
    }

    @Test(dataProvider="enableBatching")
    public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "persistent://my-property/my-ns/deferredSchemaCompressed";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{"persistent://my-property/my-ns/deferredSchemaCompressed"}).subscriptionName("testsub").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/deferredSchemaCompressed").enableBatching(enableBatching).compressionType(CompressionType.LZ4).create();
        MyBean payload = new MyBean();
        payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");
        producer.newMessage(Schema.AVRO(MyBean.class)).value((Object)payload).send();
        producer.close();
        GenericRecord res = (GenericRecord)consumer.receive(3, TimeUnit.SECONDS).getValue();
        consumer.close();
        Assert.assertEquals((Object)SchemaType.AVRO, (Object)res.getSchemaType());
        org.apache.avro.generic.GenericRecord nativeRecord = (org.apache.avro.generic.GenericRecord)res.getNativeObject();
        org.apache.avro.Schema schema = nativeRecord.getSchema();
        for (Field f : res.getFields()) {
            log.info("field {} {}", (Object)f.getName(), res.getField(f));
            Assert.assertEquals((String)"field", (String)f.getName());
            Assert.assertEquals((Object)"aaaaaaaaaaaaaaaaaaaaaaaaa", (Object)res.getField(f));
            Assert.assertEquals((String)"aaaaaaaaaaaaaaaaaaaaaaaaa", (String)nativeRecord.get(f.getName()).toString());
        }
        Assert.assertEquals((int)1, (int)res.getFields().size());
    }

    @DataProvider(name="avroSchemaProvider")
    public static Object[] avroSchemaProvider() {
        return new Object[]{Schema.AVRO(MyBean.class), Schema.JSON(MyBean.class)};
    }

    @Test(dataProvider="avroSchemaProvider")
    public void testAccessAvroSchemaMetadata(Schema<MyBean> schema) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "persistent://my-property/my-ns/accessSchema";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{"persistent://my-property/my-ns/accessSchema"}).subscriptionName("testsub").subscribe();
        Producer producer = this.pulsarClient.newProducer(schema).topic("persistent://my-property/my-ns/accessSchema").create();
        MyBean payload = new MyBean();
        payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");
        producer.send((Object)payload);
        producer.close();
        GenericRecord res = (GenericRecord)consumer.receive(3, TimeUnit.SECONDS).getValue();
        consumer.close();
        Assert.assertEquals((Object)schema.getSchemaInfo().getType(), (Object)res.getSchemaType());
        org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
        JsonNode nativeJsonRecord = null;
        if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
            nativeAvroRecord = (org.apache.avro.generic.GenericRecord)res.getNativeObject();
            Assert.assertNotNull((Object)nativeAvroRecord);
        } else {
            nativeJsonRecord = (JsonNode)res.getNativeObject();
            Assert.assertNotNull((Object)nativeJsonRecord);
        }
        for (Field f : res.getFields()) {
            log.info("field {} {}", (Object)f.getName(), res.getField(f));
            Assert.assertEquals((String)"field", (String)f.getName());
            Assert.assertEquals((Object)"aaaaaaaaaaaaaaaaaaaaaaaaa", (Object)res.getField(f));
            if (nativeAvroRecord != null) {
                Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName());
                Assert.assertEquals((Object)Schema.Type.UNION, (Object)fieldDetails.schema().getType());
                Assert.assertTrue((boolean)fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == Schema.Type.STRING));
                Assert.assertTrue((boolean)fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == Schema.Type.NULL));
                continue;
            }
            Assert.assertEquals((Object)JsonNodeType.STRING, (Object)nativeJsonRecord.get("field").getNodeType());
        }
        Assert.assertEquals((int)1, (int)res.getFields().size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTopicDoesNotExists() throws Exception {
        this.cleanup();
        this.conf.setAllowAutoTopicCreation(false);
        this.setup();
        String topic = "persistent://my-property/my-ns/none" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        try {
            Consumer consumer = this.pulsarClient.newConsumer().enableRetry(true).topic(new String[]{topic}).subscriptionName("sub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        catch (Exception e) {
            String retryTopic = topic + "-sub-RETRY";
            Assert.assertTrue((boolean)e.getMessage().contains("Topic " + retryTopic + " does not exist"));
        }
        finally {
            this.conf.setAllowAutoTopicCreation(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testPartitionTopicsOnSeparateListener() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/my-ns/one-partitioned-topic";
        String subscriptionName = "my-sub-";
        PulsarClient pulsarClient = PulsarClient.builder().listenerThreads(10).serviceUrl(this.lookupUrl.toString()).build();
        try {
            int partitions = 10;
            this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/one-partitioned-topic", partitions);
            Assert.assertEquals((int)this.admin.topics().getPartitionedTopicMetadata((String)"persistent://my-property/my-ns/one-partitioned-topic").partitions, (int)partitions);
            int totalMessages = partitions * 2;
            CountDownLatch latch = new CountDownLatch(1);
            AtomicInteger count = new AtomicInteger();
            Set listenerThreads = Sets.newConcurrentHashSet();
            MessageListener & Serializable messageListener = (MessageListener & Serializable)(c, m) -> {
                if (count.incrementAndGet() == totalMessages) {
                    latch.countDown();
                }
                listenerThreads.add(Thread.currentThread().getName());
            };
            Consumer consumer1 = pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/one-partitioned-topic"}).messageListener((MessageListener)messageListener).subscriptionName("my-sub-1").consumerName("aaa").subscribe();
            try {
                log.info("Consumer1 created. topic: {}", (Object)consumer1.getTopic());
                Producer producer1 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/one-partitioned-topic").messageRoutingMode(MessageRoutingMode.RoundRobinPartition).enableBatching(false).create();
                try {
                    log.info("Producer1 created. topic: {}", (Object)producer1.getTopic());
                    for (int i = 0; i < totalMessages; ++i) {
                        producer1.newMessage().value((Object)("one-partitioned-topic-value-producer1-" + i).getBytes(StandardCharsets.UTF_8)).send();
                    }
                    latch.await();
                    Assert.assertTrue((listenerThreads.size() >= 1 ? 1 : 0) != 0);
                    log.info("-- Exiting {} test --", (Object)this.methodName);
                }
                finally {
                    if (Collections.singletonList(producer1).get(0) != null) {
                        producer1.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testShareConsumerWithMessageListener() throws Exception {
        String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
        int total = 200;
        Set resultSet = Sets.newConcurrentHashSet();
        AtomicInteger r1 = new AtomicInteger(0);
        AtomicInteger r2 = new AtomicInteger(0);
        Producer producer = this.pulsarClient.newProducer(Schema.INT32).topic(topic).maxPendingMessages(500).enableBatching(false).create();
        try {
            Consumer c1 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{topic}).subscriptionName("shared").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).consumerName("c1").messageListener((MessageListener & Serializable)(consumer, msg) -> {
                log.info("c1 received : {}", msg.getValue());
                try {
                    resultSet.add((Integer)msg.getValue());
                    r1.incrementAndGet();
                    consumer.acknowledge(msg);
                    Thread.sleep(10L);
                }
                catch (InterruptedException interruptedException) {
                }
                catch (PulsarClientException ex) {
                    log.error("c1 acknowledge error", (Throwable)ex);
                }
            }).subscribe();
            try {
                for (int i = 0; i < total; ++i) {
                    producer.newMessage().value((Object)i).send();
                }
                Consumer c2 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{topic}).subscriptionName("shared").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).consumerName("c2").messageListener((MessageListener & Serializable)(consumer, msg) -> {
                    log.info("c2 received : {}", msg.getValue());
                    try {
                        resultSet.add((Integer)msg.getValue());
                        r2.incrementAndGet();
                        consumer.acknowledge(msg);
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException interruptedException) {
                    }
                    catch (PulsarClientException ex) {
                        log.error("c2 acknowledge error", (Throwable)ex);
                    }
                }).subscribe();
                try {
                    Awaitility.await().untilAsserted(() -> {
                        Assert.assertTrue((r1.get() >= 1 ? 1 : 0) != 0);
                        Assert.assertTrue((r2.get() >= 1 ? 1 : 0) != 0);
                        Assert.assertEquals((int)resultSet.size(), (int)total);
                    });
                }
                finally {
                    if (Collections.singletonList(c2).get(0) != null) {
                        c2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(c1).get(0) != null) {
                    c1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    public static class MyBean {
        private String field;

        public String getField() {
            return this.field;
        }

        public void setField(String field) {
            this.field = field;
        }

        public String toString() {
            return "SimpleProducerConsumerTest.MyBean(field=" + this.getField() + ")";
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof MyBean)) {
                return false;
            }
            MyBean other = (MyBean)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$field = this.getField();
            String other$field = other.getField();
            return !(this$field == null ? other$field != null : !this$field.equals(other$field));
        }

        protected boolean canEqual(Object other) {
            return other instanceof MyBean;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $field = this.getField();
            result = result * 59 + ($field == null ? 43 : $field.hashCode());
            return result;
        }
    }
}

