/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;

public class BufferPoolTest {
    private MockTime time = new MockTime();
    private Metrics metrics = new Metrics((Time)this.time);
    String metricGroup = "TestMetrics";
    Map<String, String> metricTags = new LinkedHashMap<String, String>();

    @Test
    public void testSimple() throws Exception {
        int totalMemory = 65536;
        int size = 1024;
        BufferPool pool = new BufferPool((long)totalMemory, size, false, this.metrics, (Time)this.time, this.metricGroup, this.metricTags);
        ByteBuffer buffer = pool.allocate(size);
        Assert.assertEquals((String)"Buffer size should equal requested size.", (long)size, (long)buffer.limit());
        Assert.assertEquals((String)"Unallocated memory should have shrunk", (long)(totalMemory - size), (long)pool.unallocatedMemory());
        Assert.assertEquals((String)"Available memory should have shrunk", (long)(totalMemory - size), (long)pool.availableMemory());
        buffer.putInt(1);
        buffer.flip();
        pool.deallocate(buffer);
        Assert.assertEquals((String)"All memory should be available", (long)totalMemory, (long)pool.availableMemory());
        Assert.assertEquals((String)"But now some is on the free list", (long)(totalMemory - size), (long)pool.unallocatedMemory());
        buffer = pool.allocate(size);
        Assert.assertEquals((String)"Recycled buffer should be cleared.", (long)0L, (long)buffer.position());
        Assert.assertEquals((String)"Recycled buffer should be cleared.", (long)buffer.capacity(), (long)buffer.limit());
        pool.deallocate(buffer);
        Assert.assertEquals((String)"All memory should be available", (long)totalMemory, (long)pool.availableMemory());
        Assert.assertEquals((String)"Still a single buffer on the free list", (long)(totalMemory - size), (long)pool.unallocatedMemory());
        buffer = pool.allocate(2 * size);
        pool.deallocate(buffer);
        Assert.assertEquals((String)"All memory should be available", (long)totalMemory, (long)pool.availableMemory());
        Assert.assertEquals((String)"Non-standard size didn't go to the free list.", (long)(totalMemory - size), (long)pool.unallocatedMemory());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
        BufferPool pool = new BufferPool(1024L, 512, true, this.metrics, (Time)this.time, this.metricGroup, this.metricTags);
        ByteBuffer buffer = pool.allocate(1024);
        Assert.assertEquals((long)1024L, (long)buffer.limit());
        pool.deallocate(buffer);
        buffer = pool.allocate(1025);
    }

    @Test
    public void testNonblockingMode() throws Exception {
        BufferPool pool = new BufferPool(2L, 1, false, this.metrics, (Time)this.time, this.metricGroup, this.metricTags);
        pool.allocate(1);
        try {
            pool.allocate(2);
            Assert.fail((String)"The buffer allocated more than it has!");
        }
        catch (BufferExhaustedException bufferExhaustedException) {
            // empty catch block
        }
    }

    @Test
    public void testDelayedAllocation() throws Exception {
        BufferPool pool = new BufferPool(5120L, 1024, true, this.metrics, (Time)this.time, this.metricGroup, this.metricTags);
        ByteBuffer buffer = pool.allocate(1024);
        CountDownLatch doDealloc = this.asyncDeallocate(pool, buffer);
        CountDownLatch allocation = this.asyncAllocate(pool, 5120);
        Assert.assertEquals((String)"Allocation shouldn't have happened yet, waiting on memory.", (long)1L, (long)allocation.getCount());
        doDealloc.countDown();
        allocation.await();
    }

    private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
        final CountDownLatch latch = new CountDownLatch(1);
        new Thread(){

            @Override
            public void run() {
                try {
                    latch.await();
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                pool.deallocate(buffer);
            }
        }.start();
        return latch;
    }

    private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
        final CountDownLatch completed = new CountDownLatch(1);
        new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    pool.allocate(size);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    completed.countDown();
                }
            }
        }.start();
        return completed;
    }

    @Test
    public void testStressfulSituation() throws Exception {
        int numThreads = 10;
        int iterations = 50000;
        int poolableSize = 1024;
        int totalMemory = numThreads / 2 * 1024;
        BufferPool pool = new BufferPool((long)totalMemory, 1024, true, this.metrics, (Time)this.time, this.metricGroup, this.metricTags);
        ArrayList<StressTestThread> threads = new ArrayList<StressTestThread>();
        for (int i = 0; i < numThreads; ++i) {
            threads.add(new StressTestThread(pool, 50000));
        }
        for (StressTestThread thread : threads) {
            thread.start();
        }
        for (StressTestThread thread : threads) {
            thread.join();
        }
        for (StressTestThread thread : threads) {
            Assert.assertTrue((String)"Thread should have completed all iterations successfully.", (boolean)thread.success.get());
        }
        Assert.assertEquals((long)totalMemory, (long)pool.availableMemory());
    }

    public static class StressTestThread
    extends Thread {
        private final int iterations;
        private final BufferPool pool;
        public final AtomicBoolean success = new AtomicBoolean(false);

        public StressTestThread(BufferPool pool, int iterations) {
            this.iterations = iterations;
            this.pool = pool;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < this.iterations; ++i) {
                    int size = TestUtils.random.nextBoolean() ? this.pool.poolableSize() : TestUtils.random.nextInt((int)this.pool.totalMemory());
                    ByteBuffer buffer = this.pool.allocate(size);
                    this.pool.deallocate(buffer);
                }
                this.success.set(true);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

