/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlowBookieTest
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(SlowBookieTest.class);
    final byte[] entry = "Test Entry".getBytes();

    public SlowBookieTest() {
        super(4);
        this.baseConf.setNumAddWorkerThreads(0);
        this.baseConf.setNumReadWorkerThreads(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlowBookie() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setReadTimeout(360).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, new byte[0]);
        byte[] entry = "Test Entry".getBytes();
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(entry);
        }
        CountDownLatch b0latch = new CountDownLatch(1);
        CountDownLatch b1latch = new CountDownLatch(1);
        final CountDownLatch addEntrylatch = new CountDownLatch(1);
        List curEns = lh.getCurrentEnsemble();
        try {
            this.sleepBookie((BookieId)curEns.get(0), b0latch);
            for (int i = 0; i < 10; ++i) {
                lh.addEntry(entry);
            }
            this.sleepBookie((BookieId)curEns.get(2), b1latch);
            final AtomicInteger i = new AtomicInteger(-559038737);
            AsyncCallback.AddCallback cb = new AsyncCallback.AddCallback(){

                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
                    i.set(rc);
                    addEntrylatch.countDown();
                }
            };
            lh.asyncAddEntry(entry, cb, null);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((String)"Successfully added entry!", (long)-559038737L, (long)i.get()));
            b0latch.countDown();
            b1latch.countDown();
            addEntrylatch.await(4000L, TimeUnit.MILLISECONDS);
            Assert.assertEquals((String)"Failed to add entry!", (long)0L, (long)i.get());
        }
        finally {
            b0latch.countDown();
            b1latch.countDown();
            addEntrylatch.countDown();
        }
    }

    @Test
    public void testBookieFailureWithSlowBookie() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setReadTimeout(5).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        byte[] pwd = new byte[]{};
        final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, pwd);
        final AtomicBoolean finished = new AtomicBoolean(false);
        final AtomicBoolean failTest = new AtomicBoolean(false);
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    while (!finished.get()) {
                        lh.addEntry(SlowBookieTest.this.entry);
                    }
                }
                catch (Exception e) {
                    LOG.error("Exception in add entry thread", (Throwable)e);
                    failTest.set(true);
                }
            }
        };
        t.start();
        CountDownLatch b0latch = new CountDownLatch(1);
        this.startNewBookie();
        this.sleepBookie(this.getBookie(0), b0latch);
        Thread.sleep(10000L);
        b0latch.countDown();
        finished.set(true);
        t.join();
        Assert.assertFalse((boolean)failTest.get());
        lh.close();
        LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
        LedgerChecker lc = new LedgerChecker(bkc);
        final CountDownLatch checklatch = new CountDownLatch(1);
        final AtomicInteger numFragments = new AtomicInteger(-1);
        lc.checkLedger(lh2, (BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>>(){

            public void operationComplete(int rc, Set<LedgerFragment> badFragments) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Checked ledgers returned {} {}", (Object)rc, badFragments);
                }
                if (rc == 0) {
                    numFragments.set(badFragments.size());
                }
                checklatch.countDown();
            }
        });
        checklatch.await();
        Assert.assertEquals((String)"There should be no missing fragments", (long)0L, (long)numFragments.get());
    }

    @Test
    public void testSlowBookieAndBackpressureOn() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        ((ClientConfiguration)conf.setReadTimeout(5).setAddEntryTimeout(1).setAddEntryQuorumTimeout(1).setNumChannelsPerBookie(1).setZkServers(this.zkUtil.getZooKeeperConnectString())).setClientWriteBufferLowWaterMark(1).setClientWriteBufferHighWaterMark(this.entry.length - 1).setWaitTimeoutOnBackpressureMillis(5000L);
        boolean expectWriteError = false;
        boolean expectFailedTest = false;
        try (LedgerHandle lh = this.doBackPressureTest(this.entry, conf, false, false, 2000L);){
            Assert.assertTrue((lh.readLastConfirmed() < 5L ? 1 : 0) != 0);
        }
    }

    @Test
    public void testSlowBookieAndFastFailOn() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        ((ClientConfiguration)conf.setReadTimeout(5).setAddEntryTimeout(1).setAddEntryQuorumTimeout(1).setNumChannelsPerBookie(1).setZkServers(this.zkUtil.getZooKeeperConnectString())).setClientWriteBufferLowWaterMark(1).setClientWriteBufferHighWaterMark(2).setWaitTimeoutOnBackpressureMillis(0L);
        boolean expectWriteError = true;
        boolean expectFailedTest = false;
        try (LedgerHandle lh = this.doBackPressureTest(this.entry, conf, true, false, 1000L);){
            Assert.assertTrue((lh.readLastConfirmed() < 5L ? 1 : 0) != 0);
        }
    }

    @Test
    public void testSlowBookieAndNoBackpressure() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        ((ClientConfiguration)conf.setReadTimeout(5).setAddEntryTimeout(1).setAddEntryQuorumTimeout(1).setNumChannelsPerBookie(1).setZkServers(this.zkUtil.getZooKeeperConnectString())).setClientWriteBufferLowWaterMark(1).setClientWriteBufferHighWaterMark(this.entry.length - 1).setWaitTimeoutOnBackpressureMillis(-1L);
        boolean expectWriteError = false;
        boolean expectFailedTest = false;
        try (LedgerHandle lh = this.doBackPressureTest(this.entry, conf, false, false, 4000L);){
            Assert.assertTrue((lh.readLastConfirmed() > 90L ? 1 : 0) != 0);
        }
    }

    private LedgerHandle doBackPressureTest(byte[] entry, ClientConfiguration conf, boolean expectWriteError, boolean expectFailedTest, long sleepInMillis) throws Exception {
        BookKeeper bkc = new BookKeeper(conf);
        byte[] pwd = new byte[]{};
        LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
        lh.addEntry(entry);
        AtomicBoolean finished = new AtomicBoolean(false);
        AtomicBoolean failTest = new AtomicBoolean(false);
        AtomicBoolean writeError = new AtomicBoolean(false);
        Thread t = new Thread(() -> {
            try {
                int count = 0;
                while (!finished.get()) {
                    lh.asyncAddEntry(entry, (rc, lh1, entryId, ctx) -> {
                        if (rc != 0) {
                            finished.set(true);
                            writeError.set(true);
                        }
                    }, null);
                    if (++count <= 100) continue;
                    finished.set(true);
                }
            }
            catch (Exception e) {
                LOG.error("Exception in add entry thread", (Throwable)e);
                failTest.set(true);
            }
        });
        CountDownLatch b0latch = new CountDownLatch(1);
        CountDownLatch b0latch2 = new CountDownLatch(1);
        this.sleepBookie(this.getBookie(0), b0latch);
        this.sleepBookie(this.getBookie(1), b0latch2);
        this.setTargetChannelState(bkc, this.getBookie(0), 0L, false);
        this.setTargetChannelState(bkc, this.getBookie(1), 0L, false);
        t.start();
        Thread.sleep(sleepInMillis);
        finished.set(true);
        b0latch.countDown();
        b0latch2.countDown();
        this.setTargetChannelState(bkc, this.getBookie(0), 0L, true);
        this.setTargetChannelState(bkc, this.getBookie(1), 0L, true);
        t.join();
        Assert.assertEquals((String)"write error", (Object)expectWriteError, (Object)writeError.get());
        Assert.assertEquals((String)"test failure", (Object)expectFailedTest, (Object)failTest.get());
        lh.close();
        LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
        LedgerChecker lc = new LedgerChecker(bkc);
        CountDownLatch checkLatch = new CountDownLatch(1);
        AtomicInteger numFragments = new AtomicInteger(-1);
        lc.checkLedger(lh2, (rc, fragments) -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Checked ledgers returned {} {}", (Object)rc, fragments);
            }
            if (rc == 0) {
                numFragments.set(fragments.size());
                LOG.error("Checked ledgers returned {} {}", (Object)rc, fragments);
            }
            checkLatch.countDown();
        });
        checkLatch.await();
        Assert.assertEquals((String)"There should be no missing fragments", (long)0L, (long)numFragments.get());
        return lh2;
    }

    private void setTargetChannelState(BookKeeper bkc, BookieId address, long key, boolean writable) throws Exception {
        ((BookieClientImpl)bkc.getBookieClient()).lookupClient(address).obtain((rc, pcbc) -> pcbc.setWritable(writable), key);
    }

    @Test
    public void testWriteSetWriteableCheck() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        byte[] pwd = new byte[]{};
        try (LedgerHandle lh = bkc.createLedger(4, 2, 2, BookKeeper.DigestType.CRC32, pwd);){
            lh.addEntry(this.entry);
            long entryId = lh.addEntry(this.entry);
            long nextEntryId = entryId + 1L;
            RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(2, 2, 4);
            DistributionSchedule.WriteSet writeSet = schedule.getWriteSet(nextEntryId);
            int slowBookieIndex = writeSet.get(ThreadLocalRandom.current().nextInt(writeSet.size()));
            List curEns = lh.getCurrentEnsemble();
            bkc.getBookieInfo().get(curEns.get(slowBookieIndex));
            this.setTargetChannelState(bkc, (BookieId)curEns.get(slowBookieIndex), 0L, false);
            boolean isWriteable = lh.waitForWritable(writeSet, 0, 1000L);
            Assert.assertFalse((String)"We should check b2,b3 both are not writeable", (boolean)isWriteable);
        }
    }

    @Test
    public void testManyBookieFailureWithSlowBookies() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setReadTimeout(5).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        byte[] pwd = new byte[]{};
        final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, pwd);
        final AtomicBoolean finished = new AtomicBoolean(false);
        final AtomicBoolean failTest = new AtomicBoolean(false);
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    while (!finished.get()) {
                        lh.addEntry(SlowBookieTest.this.entry);
                        Thread.sleep(1L);
                    }
                }
                catch (Exception e) {
                    LOG.error("Exception in add entry thread", (Throwable)e);
                    failTest.set(true);
                }
            }
        };
        t.start();
        CountDownLatch b0latch = new CountDownLatch(1);
        CountDownLatch b1latch = new CountDownLatch(1);
        this.startNewBookie();
        this.startNewBookie();
        this.sleepBookie(this.getBookie(0), b0latch);
        this.sleepBookie(this.getBookie(1), b1latch);
        Thread.sleep(10000L);
        b0latch.countDown();
        b1latch.countDown();
        finished.set(true);
        t.join();
        Assert.assertFalse((boolean)failTest.get());
        lh.close();
        LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
        LedgerChecker lc = new LedgerChecker(bkc);
        final CountDownLatch checklatch = new CountDownLatch(1);
        final AtomicInteger numFragments = new AtomicInteger(-1);
        lc.checkLedger(lh2, (BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>>(){

            public void operationComplete(int rc, Set<LedgerFragment> fragments) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Checked ledgers returned {} {}", (Object)rc, fragments);
                }
                if (rc == 0) {
                    numFragments.set(fragments.size());
                }
                checklatch.countDown();
            }
        });
        checklatch.await();
        Assert.assertEquals((String)"There should be no missing fragments", (long)0L, (long)numFragments.get());
    }

    @Test
    public void testWaitForWritable() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        byte[] pwd = new byte[]{};
        try (LedgerHandle lh = bkc.createLedger(1, 1, 1, BookKeeper.DigestType.CRC32, pwd);){
            long entryId = lh.addEntry(this.entry);
            RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(1, 1, 1);
            DistributionSchedule.WriteSet writeSet = schedule.getWriteSet(entryId);
            int slowBookieIndex = writeSet.get(ThreadLocalRandom.current().nextInt(writeSet.size()));
            List curEns = lh.getCurrentEnsemble();
            this.setTargetChannelState(bkc, (BookieId)curEns.get(slowBookieIndex), 0L, false);
            AtomicBoolean isWriteable = new AtomicBoolean(false);
            long timeout = 10000L;
            new Thread(() -> isWriteable.set(lh.waitForWritable(writeSet, 0, 10000L))).start();
            Awaitility.await().pollDelay(5L, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertFalse((boolean)isWriteable.get()));
            this.setTargetChannelState(bkc, (BookieId)curEns.get(slowBookieIndex), 0L, true);
            Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)isWriteable.get()));
        }
    }
}

