/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class MemoryLimitTest
extends ProducerConsumerBase {
    @DataProvider(name="batching")
    public Object[][] provider() {
        return new Object[][]{{false}, {true}};
    }

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRejectMessages() throws Exception {
        String topic = this.newTopicName();
        PulsarClientImpl client = (PulsarClientImpl)PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).memoryLimit(100L, SizeUnit.KILO_BYTES).build();
        try {
            Producer producer = client.newProducer().topic(topic).blockIfQueueFull(false).create();
            try {
                int n = 101;
                CountDownLatch latch = new CountDownLatch(101);
                for (int i = 0; i < 101; ++i) {
                    producer.sendAsync((Object)new byte[1024]).thenRun(() -> latch.countDown());
                }
                Assert.assertEquals((long)client.getMemoryLimitController().currentUsage(), (long)103424L);
                try {
                    producer.send((Object)new byte[1024]);
                    Assert.fail((String)"should have failed");
                }
                catch (PulsarClientException.MemoryBufferIsFullError memoryBufferIsFullError) {
                    // empty catch block
                }
                latch.await();
                Assert.assertEquals((long)client.getMemoryLimitController().currentUsage(), (long)0L);
                producer.send((Object)new byte[1024]);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRejectMessagesOnMultipleTopics() throws Exception {
        String t1 = this.newTopicName();
        String t2 = this.newTopicName();
        PulsarClientImpl client = (PulsarClientImpl)PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).memoryLimit(100L, SizeUnit.KILO_BYTES).build();
        try {
            Producer p1 = client.newProducer().topic(t1).blockIfQueueFull(false).create();
            try {
                Producer p2 = client.newProducer().topic(t2).blockIfQueueFull(false).create();
                try {
                    int n = 101;
                    CountDownLatch latch = new CountDownLatch(101);
                    for (int i = 0; i < 50; ++i) {
                        p1.sendAsync((Object)new byte[1024]).thenRun(() -> latch.countDown());
                        p2.sendAsync((Object)new byte[1024]).thenRun(() -> latch.countDown());
                    }
                    p1.sendAsync((Object)new byte[1024]).thenRun(() -> latch.countDown());
                    Assert.assertEquals((long)client.getMemoryLimitController().currentUsage(), (long)103424L);
                    try {
                        p1.send((Object)new byte[1024]);
                        Assert.fail((String)"should have failed");
                    }
                    catch (PulsarClientException.MemoryBufferIsFullError memoryBufferIsFullError) {
                        // empty catch block
                    }
                    try {
                        p2.send((Object)new byte[1024]);
                        Assert.fail((String)"should have failed");
                    }
                    catch (PulsarClientException.MemoryBufferIsFullError memoryBufferIsFullError) {
                        // empty catch block
                    }
                    latch.await();
                    Assert.assertEquals((long)client.getMemoryLimitController().currentUsage(), (long)0L);
                    p1.send((Object)new byte[1024]);
                    p2.send((Object)new byte[1024]);
                }
                finally {
                    if (Collections.singletonList(p2).get(0) != null) {
                        p2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(p1).get(0) != null) {
                    p1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }
}

