/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import io.netty.channel.ChannelHandlerContext;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MockBrokerService;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class ClientErrorsTest {
    MockBrokerService mockBrokerService;
    private static final int ASYNC_EVENT_COMPLETION_WAIT = 100;
    private final String ASSERTION_ERROR = "AssertionError";

    @BeforeClass(alwaysRun=true)
    public void setup() {
        this.mockBrokerService = new MockBrokerService();
        this.mockBrokerService.start();
    }

    @AfterClass(alwaysRun=true)
    public void teardown() {
        if (this.mockBrokerService != null) {
            this.mockBrokerService.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMockBrokerService() throws PulsarClientException {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            try {
                Consumer consumer = client.newConsumer().topic(new String[]{"persistent://prop/use/ns/t1"}).subscriptionName("sub1").subscribe();
                Producer producer = client.newProducer().topic("persistent://prop/use/ns/t1").create();
                Thread.sleep(100L);
                producer.send((Object)"message".getBytes());
                Thread.sleep(100L);
                consumer.unsubscribe();
                producer.close();
                consumer.close();
            }
            catch (Exception e) {
                Assert.fail((String)"None of the mocked operations should throw a client side exception");
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testProducerCreateFailWithoutRetry() throws Exception {
        this.producerCreateFailWithoutRetry("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedProducerCreateFailWithoutRetry() throws Exception {
        this.producerCreateFailWithoutRetry("persistent://prop/use/ns/part-t1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void producerCreateFailWithoutRetry(String topic) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            this.mockBrokerService.setHandleProducer((ctx, producer) -> {
                if (counter.incrementAndGet() == 2) {
                    ctx.writeAndFlush((Object)Commands.newError((long)producer.getRequestId(), (ServerError)ServerError.UnknownError, (String)"AssertionError"));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newError((long)producer.getRequestId(), (ServerError)ServerError.AuthorizationError, (String)"msg"));
            });
            try {
                client.newProducer().topic(topic).create();
            }
            catch (Exception e) {
                if (e.getMessage().equals("AssertionError")) {
                    Assert.fail((String)"Producer create should not retry on auth error");
                }
                Assert.assertTrue((boolean)(e instanceof PulsarClientException.AuthorizationException));
            }
            this.mockBrokerService.resetHandleProducer();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testProducerCreateSuccessAfterRetry() throws Exception {
        this.producerCreateSuccessAfterRetry("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedProducerCreateSuccessAfterRetry() throws Exception {
        this.producerCreateSuccessAfterRetry("persistent://prop/use/ns/part-t1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void producerCreateSuccessAfterRetry(String topic) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            this.mockBrokerService.setHandleProducer((ctx, producer) -> {
                if (counter.incrementAndGet() == 2) {
                    ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)producer.getRequestId(), (String)"default-producer", (SchemaVersion)SchemaVersion.Empty));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newError((long)producer.getRequestId(), (ServerError)ServerError.ServiceNotReady, (String)"msg"));
            });
            try {
                client.newProducer().topic(topic).create();
            }
            catch (Exception e) {
                Assert.fail((String)"Should not fail");
            }
            this.mockBrokerService.resetHandleProducer();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testProducerCreateFailAfterRetryTimeout() throws Exception {
        this.producerCreateFailAfterRetryTimeout("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedProducerCreateFailAfterRetryTimeout() throws Exception {
        this.producerCreateFailAfterRetryTimeout("persistent://prop/use/ns/part-t1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void producerCreateFailAfterRetryTimeout(String topic) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).operationTimeout(1, TimeUnit.SECONDS).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            AtomicInteger closeProducerCounter = new AtomicInteger(0);
            this.mockBrokerService.setHandleProducer((ctx, producer) -> {
                if (counter.incrementAndGet() == 2) {
                    try {
                        Thread.sleep(2000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                ctx.writeAndFlush((Object)Commands.newError((long)producer.getRequestId(), (ServerError)ServerError.ServiceNotReady, (String)"msg"));
            });
            this.mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> closeProducerCounter.incrementAndGet());
            try {
                client.newProducer().topic(topic).create();
                Assert.fail((String)"Should have failed");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e instanceof PulsarClientException));
            }
            Awaitility.await().until(() -> closeProducerCounter.get() == 1);
            this.mockBrokerService.resetHandleProducer();
            this.mockBrokerService.resetHandleCloseProducer();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testCreatedProducerSendsCloseProducerAfterTimeout() throws Exception {
        this.producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1");
    }

    @Test
    public void testCreatedPartitionedProducerSendsCloseProducerAfterTimeout() throws Exception {
        this.producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void producerCreatedThenFailsRetryTimeout(String topic) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).operationTimeout(1, TimeUnit.SECONDS).build();
        try {
            AtomicInteger producerCounter = new AtomicInteger(0);
            AtomicInteger closeProducerCounter = new AtomicInteger(0);
            this.mockBrokerService.setHandleProducer((ctx, producer) -> {
                int producerCount = producerCounter.incrementAndGet();
                if (producerCount == 1) {
                    ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)producer.getRequestId(), (String)"producer1", (SchemaVersion)SchemaVersion.Empty));
                    ctx.writeAndFlush((Object)Commands.newCloseProducer((long)producer.getProducerId(), (long)-1L));
                } else if (producerCount != 2) {
                    ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)producer.getRequestId(), (String)"producer1", (SchemaVersion)SchemaVersion.Empty));
                }
            });
            this.mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
                closeProducerCounter.incrementAndGet();
                ctx.writeAndFlush((Object)Commands.newSuccess((long)closeProducer.getRequestId()));
            });
            client.newProducer().topic(topic).create();
            Awaitility.await().until(() -> closeProducerCounter.get() == 1);
            Awaitility.await().until(() -> producerCounter.get() == 2 || producerCounter.get() == 3);
            this.mockBrokerService.resetHandleProducer();
            this.mockBrokerService.resetHandleCloseProducer();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testProducerFailDoesNotFailOtherProducer() throws Exception {
        this.producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
    }

    @Test
    public void testPartitionedProducerFailDoesNotFailOtherProducer() throws Exception {
        this.producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/part-t1", "persistent://prop/use/ns/part-t2");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void producerFailDoesNotFailOtherProducer(String topic1, String topic2) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            this.mockBrokerService.setHandleProducer((ctx, producer) -> {
                if (counter.incrementAndGet() == 2) {
                    ctx.writeAndFlush((Object)Commands.newError((long)producer.getRequestId(), (ServerError)ServerError.AuthorizationError, (String)"msg"));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)producer.getRequestId(), (String)"default-producer", (SchemaVersion)SchemaVersion.Empty));
            });
            ProducerBase producer1 = (ProducerBase)client.newProducer().topic(topic1).create();
            ProducerBase producer2 = null;
            try {
                producer2 = (ProducerBase)client.newProducer().topic(topic2).create();
                Assert.fail((String)"Should have failed");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertTrue((boolean)producer1.isConnected());
            Assert.assertFalse((producer2 != null && producer2.isConnected() ? 1 : 0) != 0);
            this.mockBrokerService.resetHandleProducer();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testProducerContinuousRetryAfterSendFail() throws Exception {
        this.producerContinuousRetryAfterSendFail("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedProducerContinuousRetryAfterSendFail() throws Exception {
        this.producerContinuousRetryAfterSendFail("persistent://prop/use/ns/part-t1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void producerContinuousRetryAfterSendFail(String topic) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            this.mockBrokerService.setHandleProducer((ctx, producer) -> {
                int i = counter.incrementAndGet();
                if (i == 1 || i == 5) {
                    ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)producer.getRequestId(), (String)"default-producer", (SchemaVersion)SchemaVersion.Empty));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newError((long)producer.getRequestId(), (ServerError)ServerError.PersistenceError, (String)"msg"));
            });
            AtomicInteger msgCounter = new AtomicInteger(0);
            this.mockBrokerService.setHandleSend((ctx, send, headersAndPayload) -> {
                if (msgCounter.incrementAndGet() == 1) {
                    ctx.writeAndFlush((Object)Commands.newSendError((long)0L, (long)0L, (ServerError)ServerError.PersistenceError, (String)"Send Failed"));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newSendReceipt((long)0L, (long)0L, (long)0L, (long)1L, (long)1L));
            });
            try {
                Producer producer2 = client.newProducer().topic(topic).create();
                producer2.send((Object)"message".getBytes());
            }
            catch (Exception e) {
                Assert.fail((String)"Should not fail");
            }
            this.mockBrokerService.resetHandleProducer();
            this.mockBrokerService.resetHandleSend();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testSubscribeFailWithoutRetry() throws Exception {
        this.subscribeFailWithoutRetry("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedSubscribeFailWithoutRetry() throws Exception {
        this.subscribeFailWithoutRetry("persistent://prop/use/ns/part-t1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLookupWithDisconnection() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            String topic = "persistent://prop/use/ns/t1";
            this.mockBrokerService.setHandlePartitionLookup((ctx, lookup) -> ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((int)0, (long)lookup.getRequestId())));
            this.mockBrokerService.setHandleLookup((ctx, lookup) -> {
                if (counter.incrementAndGet() == 1) {
                    ctx.close();
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newLookupResponse((String)this.mockBrokerService.getBrokerAddress(), null, (boolean)true, (CommandLookupTopicResponse.LookupType)CommandLookupTopicResponse.LookupType.Connect, (long)lookup.getRequestId(), (boolean)false));
            });
            try {
                client.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe();
            }
            catch (Exception e) {
                if (e.getMessage().equals("AssertionError")) {
                    Assert.fail((String)"Subscribe should not retry on persistence error");
                }
                Assert.assertTrue((boolean)(e instanceof PulsarClientException.BrokerPersistenceException));
            }
            this.mockBrokerService.resetHandlePartitionLookup();
            this.mockBrokerService.resetHandleLookup();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeFailWithoutRetry(String topic) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            this.mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
                if (counter.incrementAndGet() == 2) {
                    ctx.writeAndFlush((Object)Commands.newError((long)subscribe.getRequestId(), (ServerError)ServerError.UnknownError, (String)"AssertionError"));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newError((long)subscribe.getRequestId(), (ServerError)ServerError.PersistenceError, (String)"msg"));
            });
            try {
                client.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe();
            }
            catch (Exception e) {
                if (e.getMessage().equals("AssertionError")) {
                    Assert.fail((String)"Subscribe should not retry on persistence error");
                }
                Assert.assertTrue((boolean)(e instanceof PulsarClientException.BrokerPersistenceException));
            }
            this.mockBrokerService.resetHandleSubscribe();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testSubscribeSuccessAfterRetry() throws Exception {
        this.subscribeSuccessAfterRetry("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedSubscribeSuccessAfterRetry() throws Exception {
        this.subscribeSuccessAfterRetry("persistent://prop/use/ns/part-t1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeSuccessAfterRetry(String topic) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            this.mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
                if (counter.incrementAndGet() == 2) {
                    ctx.writeAndFlush((Object)Commands.newSuccess((long)subscribe.getRequestId()));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newError((long)subscribe.getRequestId(), (ServerError)ServerError.ServiceNotReady, (String)"msg"));
            });
            try {
                client.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe();
            }
            catch (Exception e) {
                Assert.fail((String)"Should not fail");
            }
            this.mockBrokerService.resetHandleSubscribe();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testSubscribeFailAfterRetryTimeout() throws Exception {
        this.subscribeFailAfterRetryTimeout("persistent://prop/use/ns/t1");
    }

    @Test
    public void testPartitionedSubscribeFailAfterRetryTimeout() throws Exception {
        this.subscribeFailAfterRetryTimeout("persistent://prop/use/ns/part-t1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeFailAfterRetryTimeout(String topic) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).operationTimeout(200, TimeUnit.MILLISECONDS).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            this.mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
                if (counter.incrementAndGet() == 2) {
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                ctx.writeAndFlush((Object)Commands.newError((long)subscribe.getRequestId(), (ServerError)ServerError.ServiceNotReady, (String)"msg"));
            });
            try {
                client.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe();
                Assert.fail((String)"Should have failed");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e instanceof PulsarClientException));
            }
            this.mockBrokerService.resetHandleSubscribe();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test
    public void testSubscribeFailDoesNotFailOtherConsumer() throws Exception {
        this.subscribeFailDoesNotFailOtherConsumer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
    }

    @Test
    public void testPartitionedSubscribeFailDoesNotFailOtherConsumer() throws Exception {
        this.subscribeFailDoesNotFailOtherConsumer("persistent://prop/use/ns/part-t1", "persistent://prop/use/ns/part-t2");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            AtomicInteger counter = new AtomicInteger(0);
            this.mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
                if (counter.incrementAndGet() == 2) {
                    ctx.writeAndFlush((Object)Commands.newError((long)subscribe.getRequestId(), (ServerError)ServerError.AuthorizationError, (String)"msg"));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newSuccess((long)subscribe.getRequestId()));
            });
            ConsumerBase consumer1 = (ConsumerBase)client.newConsumer().topic(new String[]{topic1}).subscriptionName("sub1").subscribe();
            ConsumerBase consumer2 = null;
            try {
                consumer2 = (ConsumerBase)client.newConsumer().topic(new String[]{topic2}).subscriptionName("sub1").subscribe();
                Assert.fail((String)"Should have failed");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertTrue((boolean)consumer1.isConnected());
            Assert.assertFalse((consumer2 != null && consumer2.isConnected() ? 1 : 0) != 0);
            this.mockBrokerService.resetHandleSubscribe();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOneProducerFailShouldCloseAllProducersInPartitionedProducer() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getHttpAddress()).build();
        try {
            AtomicInteger producerCounter = new AtomicInteger(0);
            AtomicInteger closeCounter = new AtomicInteger(0);
            this.mockBrokerService.setHandleProducer((ctx, producer) -> {
                if (producerCounter.incrementAndGet() == 3) {
                    ctx.writeAndFlush((Object)Commands.newError((long)producer.getRequestId(), (ServerError)ServerError.AuthorizationError, (String)"msg"));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)producer.getRequestId(), (String)"default-producer", (SchemaVersion)SchemaVersion.Empty));
            });
            this.mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
                ctx.writeAndFlush((Object)Commands.newSuccess((long)closeProducer.getRequestId()));
                closeCounter.incrementAndGet();
            });
            try {
                client.newProducer().topic("persistent://prop/use/ns/multi-part-t1").create();
                Assert.fail((String)"Should have failed with an authorization error");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e instanceof PulsarClientException.AuthorizationException));
                Assert.assertEquals((int)closeCounter.get(), (int)3);
            }
            this.mockBrokerService.resetHandleProducer();
            this.mockBrokerService.resetHandleCloseProducer();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOneConsumerFailShouldCloseAllConsumersInPartitionedConsumer() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getHttpAddress()).build();
        try {
            AtomicInteger subscribeCounter = new AtomicInteger(0);
            AtomicInteger closeCounter = new AtomicInteger(0);
            this.mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
                System.err.println("subscribeCounter: " + subscribeCounter.get());
                if (subscribeCounter.incrementAndGet() == 3) {
                    ctx.writeAndFlush((Object)Commands.newError((long)subscribe.getRequestId(), (ServerError)ServerError.AuthorizationError, (String)"msg"));
                    return;
                }
                ctx.writeAndFlush((Object)Commands.newSuccess((long)subscribe.getRequestId()));
            });
            this.mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
                ctx.writeAndFlush((Object)Commands.newSuccess((long)closeConsumer.getRequestId()));
                closeCounter.incrementAndGet();
            });
            try {
                client.newConsumer().topic(new String[]{"persistent://prop/use/ns/multi-part-t1"}).subscriptionName("sub1").subscribe();
                Assert.fail((String)"Should have failed with an authorization error");
            }
            catch (PulsarClientException.AuthorizationException authorizationException) {
                // empty catch block
            }
            Assert.assertEquals((int)closeCounter.get(), (int)3);
            this.mockBrokerService.resetHandleSubscribe();
            this.mockBrokerService.resetHandleCloseConsumer();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFlowSendWhenPartitionedSubscribeCompletes() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getHttpAddress()).build();
        try {
            AtomicInteger subscribed = new AtomicInteger();
            AtomicBoolean fail = new AtomicBoolean(false);
            this.mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
                subscribed.incrementAndGet();
                ctx.writeAndFlush((Object)Commands.newSuccess((long)subscribe.getRequestId()));
            });
            this.mockBrokerService.setHandleFlow((ctx, sendFlow) -> {
                if (subscribed.get() != 4) {
                    fail.set(true);
                }
            });
            client.newConsumer().topic(new String[]{"persistent://prop/use/ns/multi-part-t1"}).subscriptionName("sub1").subscribe();
            if (fail.get()) {
                Assert.fail((String)"Flow command should have been sent after all 4 partitions subscribe successfully");
            }
            this.mockBrokerService.resetHandleSubscribe();
            this.mockBrokerService.resetHandleFlow();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(invocationCount=10, groups={"broker-api"})
    public void testProducerReconnect() throws Exception {
        AtomicInteger numOfConnections = new AtomicInteger();
        AtomicReference channelCtx = new AtomicReference();
        AtomicBoolean msgSent = new AtomicBoolean();
        this.mockBrokerService.setHandleConnect((ctx, connect) -> {
            channelCtx.set(ctx);
            ctx.writeAndFlush((Object)Commands.newConnected((int)connect.getProtocolVersion()));
            if (numOfConnections.incrementAndGet() == 2) {
                ctx.channel().close();
            }
        });
        this.mockBrokerService.setHandleProducer((ctx, produce) -> ctx.writeAndFlush((Object)Commands.newProducerSuccess((long)produce.getRequestId(), (String)"default-producer", (SchemaVersion)SchemaVersion.Empty)));
        this.mockBrokerService.setHandleSend((ctx, sendCmd, headersAndPayload) -> {
            msgSent.set(true);
            ctx.writeAndFlush((Object)Commands.newSendReceipt((long)0L, (long)0L, (long)0L, (long)1L, (long)1L));
        });
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            Producer producer = client.newProducer().topic("persistent://prop/use/ns/t1").create();
            ((ChannelHandlerContext)channelCtx.get()).channel().close().get();
            producer.send((Object)new byte[0]);
            Assert.assertTrue((boolean)msgSent.get());
            Assert.assertTrue((numOfConnections.get() >= 3 ? 1 : 0) != 0);
            this.mockBrokerService.resetHandleConnect();
            this.mockBrokerService.resetHandleProducer();
            this.mockBrokerService.resetHandleSend();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumerReconnect() throws Exception {
        AtomicInteger numOfConnections = new AtomicInteger();
        AtomicReference channelCtx = new AtomicReference();
        CountDownLatch latch = new CountDownLatch(1);
        this.mockBrokerService.setHandleConnect((ctx, connect) -> {
            channelCtx.set(ctx);
            ctx.writeAndFlush((Object)Commands.newConnected((int)connect.getProtocolVersion()));
            if (numOfConnections.incrementAndGet() == 2) {
                ctx.channel().close();
            }
            if (numOfConnections.get() == 3) {
                latch.countDown();
            }
        });
        this.mockBrokerService.setHandleSubscribe((ctx, subscribe) -> ctx.writeAndFlush((Object)Commands.newSuccess((long)subscribe.getRequestId())));
        PulsarClient client = PulsarClient.builder().serviceUrl(this.mockBrokerService.getBrokerAddress()).build();
        try {
            client.newConsumer().topic(new String[]{"persistent://prop/use/ns/t1"}).subscriptionName("sub1").subscribe();
            ((ChannelHandlerContext)channelCtx.get()).channel().close();
            latch.await(5L, TimeUnit.SECONDS);
            Assert.assertEquals((int)numOfConnections.get(), (int)3);
            this.mockBrokerService.resetHandleConnect();
            this.mockBrokerService.resetHandleSubscribe();
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }
}

