/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;

public class TestMaxSizeWorkersQueue
extends BookKeeperClusterTestCase {
    BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;

    public TestMaxSizeWorkersQueue() {
        super(1);
        this.baseConf.setNumReadWorkerThreads(1);
        this.baseConf.setNumAddWorkerThreads(1);
        this.baseConf.setMaxPendingReadRequestPerThread(1);
        this.baseConf.setMaxPendingAddRequestPerThread(1);
    }

    @Test
    public void testReadRejected() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(1, 1, this.digestType, new byte[0]);
        byte[] content = new byte[100];
        int n = 1000;
        for (int i = 0; i < 1000; ++i) {
            lh.addEntry(content);
        }
        final CountDownLatch counter = new CountDownLatch(2);
        final AtomicInteger rcFirstReadOperation = new AtomicInteger();
        lh.asyncReadEntries(0L, 0L, new AsyncCallback.ReadCallback(){

            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                rcFirstReadOperation.set(rc);
                counter.countDown();
            }
        }, (Object)lh);
        final AtomicInteger rcSecondReadOperation = new AtomicInteger();
        lh.asyncReadEntries(0L, 999L, new AsyncCallback.ReadCallback(){

            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                rcSecondReadOperation.set(rc);
                counter.countDown();
            }
        }, (Object)lh);
        counter.await();
        Assert.assertEquals((long)0L, (long)rcFirstReadOperation.get());
        Assert.assertEquals((long)-105L, (long)rcSecondReadOperation.get());
    }

    @Test
    public void testAddRejected() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(1, 1, this.digestType, new byte[0]);
        byte[] content = new byte[100];
        int n = 1000;
        final CountDownLatch counter = new CountDownLatch(1000);
        final AtomicBoolean receivedTooManyRequestsException = new AtomicBoolean();
        for (int i = 0; i < 1000; ++i) {
            lh.asyncAddEntry(content, new AsyncCallback.AddCallback(){

                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
                    if (rc == -6) {
                        receivedTooManyRequestsException.set(true);
                    }
                    counter.countDown();
                }
            }, null);
        }
        counter.await();
        Assert.assertTrue((boolean)receivedTooManyRequestsException.get());
    }

    @Test
    public void testRecoveryNotRejected() throws Exception {
        final LedgerHandle lh = this.bkc.createLedger(1, 1, this.digestType, new byte[0]);
        byte[] content = new byte[100];
        int numEntriesToRead = 1000;
        for (int i = 0; i < 1000; ++i) {
            lh.addEntry(content);
        }
        int numLedgersToRecover = 10;
        ArrayList ledgersToRecover = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            LedgerHandle lhr = this.bkc.createLedger(1, 1, this.digestType, new byte[0]);
            lhr.addEntry(content);
            ledgersToRecover.add(lhr.getId());
        }
        ExecutorService executor = Executors.newCachedThreadPool();
        final CyclicBarrier barrier = new CyclicBarrier(11);
        ArrayList futures = Lists.newArrayList();
        futures.add(executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                barrier.await();
                try {
                    lh.readEntries(0L, 999L);
                    Assert.fail((String)"Should have thrown exception");
                }
                catch (Exception exception) {
                    // empty catch block
                }
                return null;
            }
        }));
        Iterator iterator = ledgersToRecover.iterator();
        while (iterator.hasNext()) {
            final long ledgerId = (Long)iterator.next();
            futures.add(executor.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    barrier.await();
                    TestMaxSizeWorkersQueue.this.bkc.openLedger(ledgerId, TestMaxSizeWorkersQueue.this.digestType, new byte[0]);
                    return null;
                }
            }));
        }
        for (Future future : futures) {
            future.get();
        }
    }
}

