/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlowBookieTest
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(SlowBookieTest.class);

    public SlowBookieTest() {
        super(4);
        this.baseConf.setNumAddWorkerThreads(0);
        this.baseConf.setNumReadWorkerThreads(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlowBookie() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setReadTimeout(360).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, new byte[0]);
        byte[] entry = "Test Entry".getBytes();
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(entry);
        }
        CountDownLatch b0latch = new CountDownLatch(1);
        CountDownLatch b1latch = new CountDownLatch(1);
        final CountDownLatch addEntrylatch = new CountDownLatch(1);
        ArrayList curEns = lh.getLedgerMetadata().currentEnsemble;
        try {
            this.sleepBookie((BookieSocketAddress)curEns.get(0), b0latch);
            for (int i = 0; i < 10; ++i) {
                lh.addEntry(entry);
            }
            this.sleepBookie((BookieSocketAddress)curEns.get(2), b1latch);
            final AtomicInteger i = new AtomicInteger(-559038737);
            AsyncCallback.AddCallback cb = new AsyncCallback.AddCallback(){

                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
                    i.set(rc);
                    addEntrylatch.countDown();
                }
            };
            lh.asyncAddEntry(entry, cb, null);
            Thread.sleep(3000L);
            Assert.assertEquals((String)"Successfully added entry!", (long)-559038737L, (long)i.get());
            b0latch.countDown();
            b1latch.countDown();
            addEntrylatch.await(4000L, TimeUnit.MILLISECONDS);
            Assert.assertEquals((String)"Failed to add entry!", (long)0L, (long)i.get());
        }
        finally {
            b0latch.countDown();
            b1latch.countDown();
            addEntrylatch.countDown();
        }
    }

    @Test
    public void testBookieFailureWithSlowBookie() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setReadTimeout(5).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        byte[] pwd = new byte[]{};
        final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, pwd);
        final AtomicBoolean finished = new AtomicBoolean(false);
        final AtomicBoolean failTest = new AtomicBoolean(false);
        final byte[] entry = "Test Entry".getBytes();
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    while (!finished.get()) {
                        lh.addEntry(entry);
                    }
                }
                catch (Exception e) {
                    LOG.error("Exception in add entry thread", (Throwable)e);
                    failTest.set(true);
                }
            }
        };
        t.start();
        CountDownLatch b0latch = new CountDownLatch(1);
        this.startNewBookie();
        this.sleepBookie(this.getBookie(0), b0latch);
        Thread.sleep(10000L);
        b0latch.countDown();
        finished.set(true);
        t.join();
        Assert.assertFalse((boolean)failTest.get());
        lh.close();
        LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
        LedgerChecker lc = new LedgerChecker(bkc);
        final CountDownLatch checklatch = new CountDownLatch(1);
        final AtomicInteger numFragments = new AtomicInteger(-1);
        lc.checkLedger(lh2, (BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>>(){

            public void operationComplete(int rc, Set<LedgerFragment> badFragments) {
                LOG.debug("Checked ledgers returned {} {}", (Object)rc, badFragments);
                if (rc == 0) {
                    numFragments.set(badFragments.size());
                }
                checklatch.countDown();
            }
        });
        checklatch.await();
        Assert.assertEquals((String)"There should be no missing fragments", (long)0L, (long)numFragments.get());
    }

    @Test
    public void testManyBookieFailureWithSlowBookies() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setReadTimeout(5).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        byte[] pwd = new byte[]{};
        final LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
        final AtomicBoolean finished = new AtomicBoolean(false);
        final AtomicBoolean failTest = new AtomicBoolean(false);
        final byte[] entry = "Test Entry".getBytes();
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    while (!finished.get()) {
                        lh.addEntry(entry);
                        Thread.sleep(1L);
                    }
                }
                catch (Exception e) {
                    LOG.error("Exception in add entry thread", (Throwable)e);
                    failTest.set(true);
                }
            }
        };
        t.start();
        CountDownLatch b0latch = new CountDownLatch(1);
        CountDownLatch b1latch = new CountDownLatch(1);
        this.startNewBookie();
        this.startNewBookie();
        this.sleepBookie(this.getBookie(0), b0latch);
        this.sleepBookie(this.getBookie(1), b1latch);
        Thread.sleep(10000L);
        b0latch.countDown();
        b1latch.countDown();
        finished.set(true);
        t.join();
        Assert.assertFalse((boolean)failTest.get());
        lh.close();
        LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
        LedgerChecker lc = new LedgerChecker(bkc);
        final CountDownLatch checklatch = new CountDownLatch(1);
        final AtomicInteger numFragments = new AtomicInteger(-1);
        lc.checkLedger(lh2, (BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>>(){

            public void operationComplete(int rc, Set<LedgerFragment> fragments) {
                LOG.debug("Checked ledgers returned {} {}", (Object)rc, fragments);
                if (rc == 0) {
                    numFragments.set(fragments.size());
                }
                checklatch.countDown();
            }
        });
        checklatch.await();
        Assert.assertEquals((String)"There should be no missing fragments", (long)0L, (long)numFragments.get());
    }
}

