/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import io.netty.util.IllegalReferenceCountException;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookKeeperTest
extends BookKeeperClusterTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(BookKeeperTest.class);
    private static final long INVALID_LEDGERID = -1L;
    private final BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;

    public BookKeeperTest() {
        super(3);
    }

    @Test
    public void testConstructionZkDelay() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        ((ClientConfiguration)conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri())).setZkTimeout(20000);
        CountDownLatch l = new CountDownLatch(1);
        this.zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS, l);
        l.await();
        BookKeeper bkc = new BookKeeper(conf);
        bkc.createLedger(this.digestType, "testPasswd".getBytes()).close();
        bkc.close();
    }

    @Test
    public void testConstructionNotConnectedExplicitZk() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        ((ClientConfiguration)conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri())).setZkTimeout(20000);
        CountDownLatch l = new CountDownLatch(1);
        this.zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS, l);
        l.await();
        ZooKeeper zk = new ZooKeeper(this.zkUtil.getZooKeeperConnectString(), 50, event -> {});
        Assert.assertFalse((String)"ZK shouldn't have connected yet", (boolean)zk.getState().isConnected());
        try {
            BookKeeper bkc = new BookKeeper(conf, zk);
            Assert.fail((String)"Shouldn't be able to construct with unconnected zk");
        }
        catch (IOException cle) {
            Assert.assertTrue((boolean)(cle.getCause() instanceof KeeperException.ConnectionLossException));
        }
    }

    @Test
    public void testBookkeeperDigestPasswordWithAutoDetection() throws Exception {
        this.testBookkeeperDigestPassword(true);
    }

    @Test
    public void testBookkeeperDigestPasswordWithoutAutoDetection() throws Exception {
        this.testBookkeeperDigestPassword(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void testBookkeeperDigestPassword(boolean autodetection) throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        conf.setEnableDigestTypeAutodetection(autodetection);
        BookKeeper bkc = new BookKeeper(conf);
        BookKeeper.DigestType digestCorrect = this.digestType;
        byte[] passwdCorrect = "AAAAAAA".getBytes();
        BookKeeper.DigestType digestBad = this.digestType == BookKeeper.DigestType.MAC ? BookKeeper.DigestType.CRC32 : BookKeeper.DigestType.MAC;
        byte[] passwdBad = "BBBBBBB".getBytes();
        LedgerHandle lh = null;
        try {
            long id;
            block12: {
                lh = bkc.createLedger(digestCorrect, passwdCorrect);
                id = lh.getId();
                for (int i = 0; i < 100; ++i) {
                    lh.addEntry("foobar".getBytes());
                }
                lh.close();
                try {
                    bkc.openLedger(id, digestCorrect, passwdBad);
                    Assert.fail((String)"Shouldn't be able to open with bad passwd");
                }
                catch (BKException.BKUnauthorizedAccessException i) {
                    // empty catch block
                }
                try {
                    bkc.openLedger(id, digestBad, passwdCorrect);
                    if (!autodetection) {
                        Assert.fail((String)"Shouldn't be able to open with bad digest");
                    }
                }
                catch (BKException.BKDigestMatchException bke) {
                    if (!autodetection) break block12;
                    Assert.fail((String)"Should not throw digest match exception if `autodetection` is enabled");
                }
            }
            try {
                bkc.openLedger(id, digestBad, passwdBad);
                Assert.fail((String)"Shouldn't be able to open with bad passwd and digest");
            }
            catch (BKException.BKUnauthorizedAccessException bKUnauthorizedAccessException) {
                // empty catch block
            }
            bkc.openLedger(id, digestCorrect, passwdCorrect).close();
        }
        finally {
            if (lh != null) {
                lh.close();
            }
            bkc.close();
        }
    }

    @Test
    public void testAsyncReadWithError() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "testPasswd".getBytes());
        this.bkc.close();
        final AtomicInteger result = new AtomicInteger(0);
        final CountDownLatch counter = new CountDownLatch(1);
        lh.asyncAddEntry("test".getBytes(), new AsyncCallback.AddCallback(){

            public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
                result.set(rc);
                counter.countDown();
            }
        }, null);
        counter.await();
        Assert.assertTrue((result.get() != 0 ? 1 : 0) != 0);
    }

    @Test
    public void testCloseDuringOp() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        for (int i = 0; i < 10; ++i) {
            final BookKeeper client = new BookKeeper(conf);
            final CountDownLatch l = new CountDownLatch(1);
            final AtomicBoolean success = new AtomicBoolean(false);
            Thread t = new Thread(){

                @Override
                public void run() {
                    try {
                        LedgerHandle lh = client.createLedger(3, 3, BookKeeperTest.this.digestType, "testPasswd".getBytes());
                        BookKeeperTest.this.startNewBookie();
                        BookKeeperTest.this.killBookie(0);
                        lh.asyncAddEntry("test".getBytes(), new AsyncCallback.AddCallback(){

                            public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
                            }
                        }, null);
                        client.close();
                        success.set(true);
                        l.countDown();
                    }
                    catch (Exception e) {
                        LOG.error("Error running test", (Throwable)e);
                        success.set(false);
                        l.countDown();
                    }
                }
            };
            t.start();
            Assert.assertTrue((String)"Close never completed", (boolean)l.await(10L, TimeUnit.SECONDS));
            Assert.assertTrue((String)"Close was not successful", (boolean)success.get());
        }
    }

    @Test
    public void testIsClosed() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        LedgerHandle lh = bkc.createLedger(this.digestType, "testPasswd".getBytes());
        Long lId = lh.getId();
        lh.addEntry("000".getBytes());
        boolean result = bkc.isClosed(lId.longValue());
        Assert.assertTrue((String)"Ledger shouldn't be flagged as closed!", (!result ? 1 : 0) != 0);
        lh.close();
        result = bkc.isClosed(lId.longValue());
        Assert.assertTrue((String)"Ledger should be flagged as closed!", (boolean)result);
        bkc.close();
    }

    @Test
    public void testReadFailureCallback() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bkc = new BookKeeper(conf);
        LedgerHandle lh = bkc.createLedger(this.digestType, "testPasswd".getBytes());
        int numEntries = 10;
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(("entry-" + i).getBytes());
        }
        this.stopBKCluster();
        try {
            lh.readEntries(0L, 9L);
            Assert.fail((String)"Read operation should have failed");
        }
        catch (BKException.BKBookieHandleNotAvailableException i) {
            // empty catch block
        }
        final CountDownLatch counter = new CountDownLatch(1);
        final AtomicInteger receivedResponses = new AtomicInteger(0);
        final AtomicInteger returnCode = new AtomicInteger();
        lh.asyncReadEntries(0L, 9L, new AsyncCallback.ReadCallback(){

            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                returnCode.set(rc);
                receivedResponses.incrementAndGet();
                counter.countDown();
            }
        }, null);
        counter.await();
        Thread.sleep(1000L);
        Assert.assertEquals((long)1L, (long)receivedResponses.get());
        Assert.assertEquals((long)-8L, (long)returnCode.get());
        bkc.close();
    }

    @Test
    public void testAutoCloseableBookKeeper() throws Exception {
        BookKeeper bkc2;
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        try (BookKeeper bkc = new BookKeeper(conf);){
            long ledgerId;
            bkc2 = bkc;
            try (LedgerHandle lh = bkc.createLedger(this.digestType, "testPasswd".getBytes());){
                ledgerId = lh.getId();
                for (int i = 0; i < 100; ++i) {
                    lh.addEntry("foobar".getBytes());
                }
            }
            Assert.assertTrue((String)"Ledger should be closed!", (boolean)bkc.isClosed(ledgerId));
        }
        Assert.assertTrue((String)"BookKeeper should be closed!", (boolean)bkc2.closed);
    }

    @Test
    public void testReadAfterLastAddConfirmed() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        try (BookKeeper bkWriter = new BookKeeper(clientConfiguration);){
            String entryString;
            LedgerEntry entry;
            int entryId;
            Enumeration entries2;
            LedgerHandle rlh;
            LedgerHandle writeLh = bkWriter.createLedger(this.digestType, "testPasswd".getBytes());
            long ledgerId = writeLh.getId();
            int numOfEntries = 5;
            for (int i = 0; i < numOfEntries; ++i) {
                writeLh.addEntry(("foobar" + i).getBytes());
            }
            try (BookKeeper bkReader = new BookKeeper(clientConfiguration);){
                rlh = bkReader.openLedgerNoRecovery(ledgerId, this.digestType, "testPasswd".getBytes());
                try {
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    Assert.assertFalse((boolean)writeLh.isClosed());
                    entries2 = rlh.readUnconfirmedEntries(0L, (long)(numOfEntries - 1));
                    entryId = 0;
                    while (entries2.hasMoreElements()) {
                        entry = (LedgerEntry)entries2.nextElement();
                        entryString = new String(entry.getEntry());
                        Assert.assertTrue((String)("Expected entry String: " + "foobar" + entryId + " actual entry String: " + entryString), (boolean)entryString.equals("foobar" + entryId));
                        ++entryId;
                    }
                }
                finally {
                    if (rlh != null) {
                        rlh.close();
                    }
                }
            }
            bkReader = new BookKeeper(clientConfiguration);
            try {
                rlh = bkReader.openLedgerNoRecovery(ledgerId, this.digestType, "testPasswd".getBytes());
                try {
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    Assert.assertFalse((boolean)writeLh.isClosed());
                    try {
                        rlh.readEntries(0L, (long)(numOfEntries - 1));
                        Assert.fail((String)("shoud not be able to read up to " + (numOfEntries - 1) + " with readEntries"));
                    }
                    catch (BKException.BKReadException entries2) {
                        // empty catch block
                    }
                    Assert.assertEquals((long)(rlh.getLastAddConfirmed() + 1L), (long)Collections.list(rlh.readEntries(0L, rlh.getLastAddConfirmed())).size());
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    Assert.assertEquals((long)(rlh.getLastAddConfirmed() + 1L), (long)Collections.list(rlh.readUnconfirmedEntries(0L, rlh.getLastAddConfirmed())).size());
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    Assert.assertEquals((long)((long)numOfEntries - rlh.getLastAddConfirmed()), (long)Collections.list(rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), (long)(numOfEntries - 1))).size());
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    try {
                        rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), (long)numOfEntries);
                        Assert.fail((String)("the read tried to access data for unexisting entry id " + numOfEntries));
                    }
                    catch (BKException.BKNoSuchEntryException entries2) {
                        // empty catch block
                    }
                    try {
                        rlh.readEntries(rlh.getLastAddConfirmed(), (long)numOfEntries);
                        Assert.fail((String)("the read tries to access data for unexisting entry id " + numOfEntries));
                    }
                    catch (BKException.BKReadException entries2) {
                        // empty catch block
                    }
                }
                finally {
                    if (rlh != null) {
                        rlh.close();
                    }
                }
            }
            finally {
                bkReader.close();
            }
            this.restartBookies();
            bkReader = new BookKeeper(clientConfiguration);
            try {
                rlh = bkReader.openLedgerNoRecovery(ledgerId, this.digestType, "testPasswd".getBytes());
                try {
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    Assert.assertFalse((boolean)writeLh.isClosed());
                    entries2 = rlh.readUnconfirmedEntries(0L, (long)(numOfEntries - 1));
                    entryId = 0;
                    while (entries2.hasMoreElements()) {
                        entry = (LedgerEntry)entries2.nextElement();
                        entryString = new String(entry.getEntry());
                        Assert.assertTrue((String)("Expected entry String: " + "foobar" + entryId + " actual entry String: " + entryString), (boolean)entryString.equals("foobar" + entryId));
                        ++entryId;
                    }
                }
                finally {
                    if (rlh != null) {
                        rlh.close();
                    }
                }
            }
            finally {
                bkReader.close();
            }
            bkReader = new BookKeeper(clientConfiguration);
            try {
                rlh = bkReader.openLedgerNoRecovery(ledgerId, this.digestType, "testPasswd".getBytes());
                try {
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    Assert.assertFalse((boolean)writeLh.isClosed());
                    try {
                        rlh.readEntries(0L, (long)(numOfEntries - 1));
                        Assert.fail((String)("shoud not be able to read up to " + (numOfEntries - 1) + " with readEntries"));
                    }
                    catch (BKException.BKReadException entries3) {
                        // empty catch block
                    }
                    Assert.assertEquals((long)(rlh.getLastAddConfirmed() + 1L), (long)Collections.list(rlh.readEntries(0L, rlh.getLastAddConfirmed())).size());
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    Assert.assertEquals((long)(rlh.getLastAddConfirmed() + 1L), (long)Collections.list(rlh.readUnconfirmedEntries(0L, rlh.getLastAddConfirmed())).size());
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    Assert.assertEquals((long)((long)numOfEntries - rlh.getLastAddConfirmed()), (long)Collections.list(rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), (long)(numOfEntries - 1))).size());
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 2) ? 1 : 0) != 0);
                    try {
                        rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), (long)numOfEntries);
                        Assert.fail((String)("the read tried to access data for unexisting entry id " + numOfEntries));
                    }
                    catch (BKException.BKNoSuchEntryException entries3) {
                        // empty catch block
                    }
                    try {
                        rlh.readEntries(rlh.getLastAddConfirmed(), (long)numOfEntries);
                        Assert.fail((String)("the read tries to access data for unexisting entry id " + numOfEntries));
                    }
                    catch (BKException.BKReadException entries3) {
                        // empty catch block
                    }
                }
                finally {
                    if (rlh != null) {
                        rlh.close();
                    }
                }
            }
            finally {
                bkReader.close();
            }
            bkReader = new BookKeeper(clientConfiguration);
            try {
                rlh = bkReader.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());
                try {
                    Assert.assertTrue((String)("Expected LAC of rlh: " + (numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed()), (rlh.getLastAddConfirmed() == (long)(numOfEntries - 1) ? 1 : 0) != 0);
                    Assert.assertFalse((boolean)writeLh.isClosed());
                    entries2 = rlh.readEntries(0L, (long)(numOfEntries - 1));
                    entryId = 0;
                    while (entries2.hasMoreElements()) {
                        entry = (LedgerEntry)entries2.nextElement();
                        entryString = new String(entry.getEntry());
                        Assert.assertTrue((String)("Expected entry String: " + "foobar" + entryId + " actual entry String: " + entryString), (boolean)entryString.equals("foobar" + entryId));
                        ++entryId;
                    }
                }
                finally {
                    if (rlh != null) {
                        rlh.close();
                    }
                }
            }
            finally {
                bkReader.close();
            }
            writeLh.close();
        }
    }

    @Test
    public void testReadWriteWithV2WireProtocol() throws Exception {
        ClientConfiguration conf = new ClientConfiguration().setUseV2WireProtocol(true);
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        int numEntries = 100;
        byte[] data = "foobar".getBytes();
        try (BookKeeper bkc = new BookKeeper(conf);){
            long ledgerId;
            try (LedgerHandle lh = bkc.createLedger(this.digestType, "testPasswd".getBytes());){
                ledgerId = lh.getId();
                for (int i = 0; i < numEntries; ++i) {
                    lh.addEntry(data);
                }
            }
            lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());
            try {
                Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
                Enumeration readEntries = lh.readEntries(0L, (long)(numEntries - 1));
                while (readEntries.hasMoreElements()) {
                    LedgerEntry entry = (LedgerEntry)readEntries.nextElement();
                    Assert.assertArrayEquals((byte[])data, (byte[])entry.getEntry());
                }
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
            try (LedgerHandle lh2 = bkc.createLedger(this.digestType, "testPasswd".getBytes());){
                ledgerId = lh2.getId();
                lh2.addEntry(data);
                LedgerHandle lh2Fence = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());
                if (lh2Fence != null) {
                    lh2Fence.close();
                }
                try {
                    lh2.addEntry(data);
                    Assert.fail((String)"ledger should be fenced");
                }
                catch (BKException.BKLedgerFencedException bKLedgerFencedException) {
                    // empty catch block
                }
            }
        }
    }

    @Test
    public void testBatchReadFailBackToSingleRead1() throws Exception {
        long ledgerId;
        LedgerHandle lh;
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        int numEntries = 100;
        byte[] data = "foobar".getBytes();
        try (BookKeeper bkc = new BookKeeper(conf);){
            lh = bkc.createLedger(2, 2, 2, this.digestType, "testPasswd".getBytes());
            try {
                ledgerId = lh.getId();
                for (int i = 0; i < numEntries; ++i) {
                    lh.addEntry(data);
                }
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
            lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());
            try {
                Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
                try {
                    lh.batchReadEntries(0L, numEntries, 0x500000L);
                    Assert.fail((String)"Should throw UnsupportedOperationException.");
                }
                catch (UnsupportedOperationException e) {
                    Assert.assertEquals((Object)"Unsupported batch read entry operation for v3 protocol.", (Object)e.getMessage());
                }
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
        }
        bkc = new BookKeeper(conf);
        try {
            lh = bkc.createLedger(3, 2, 2, this.digestType, "testPasswd".getBytes());
            try {
                ledgerId = lh.getId();
                for (int i = 0; i < numEntries; ++i) {
                    lh.addEntry(data);
                }
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
            lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());
            try {
                Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
                Enumeration readEntries = lh.batchReadEntries(0L, numEntries, 0x500000L);
                while (readEntries.hasMoreElements()) {
                    LedgerEntry entry = (LedgerEntry)readEntries.nextElement();
                    Assert.assertArrayEquals((byte[])data, (byte[])entry.getEntry());
                }
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
        }
        finally {
            bkc.close();
        }
    }

    @Test
    public void testBatchReadFailBackToSingleRead2() throws Exception {
        long ledgerId;
        LedgerHandle lh;
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        int numEntries = 100;
        byte[] data = "foobar".getBytes();
        try (BookKeeper bkc = new BookKeeper(conf);){
            lh = bkc.createLedger(2, 2, 2, this.digestType, "testPasswd".getBytes());
            try {
                ledgerId = lh.getId();
                for (int i = 0; i < numEntries; ++i) {
                    lh.addEntry(data);
                }
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
            lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());
            try {
                Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
                try {
                    lh.batchReadEntries(0L, numEntries, 0x500000L);
                    Assert.fail((String)"Should throw UnsupportedOperationException.");
                }
                catch (UnsupportedOperationException e) {
                    Assert.assertEquals((Object)"Unsupported batch read entry operation for v3 protocol.", (Object)e.getMessage());
                }
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
        }
        conf.setBatchReadEnabled(false);
        bkc = new BookKeeper(conf);
        try {
            lh = bkc.createLedger(2, 2, 2, this.digestType, "testPasswd".getBytes());
            try {
                ledgerId = lh.getId();
                for (int i = 0; i < numEntries; ++i) {
                    lh.addEntry(data);
                }
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
            lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());
            try {
                Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
                Enumeration readEntries = lh.batchReadEntries(0L, numEntries, 0x500000L);
                while (readEntries.hasMoreElements()) {
                    LedgerEntry entry = (LedgerEntry)readEntries.nextElement();
                    Assert.assertArrayEquals((byte[])data, (byte[])entry.getEntry());
                }
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
        }
        finally {
            bkc.close();
        }
    }

    @Test
    public void testBatchReadWithV2Protocol() throws Exception {
        ClientConfiguration conf = new ClientConfiguration().setUseV2WireProtocol(true);
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        int numEntries = 100;
        byte[] data = "foobar".getBytes();
        try (BookKeeper bkc = new BookKeeper(conf);){
            long ledgerId;
            try (LedgerHandle lh = bkc.createLedger(2, 2, 2, this.digestType, "testPasswd".getBytes());){
                ledgerId = lh.getId();
                for (int i = 0; i < numEntries; ++i) {
                    lh.addEntry(data);
                }
            }
            lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());
            try {
                LedgerEntry entry;
                LedgerEntry entry2;
                Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
                int entries = 0;
                Enumeration readEntries = lh.batchReadEntries(0L, numEntries, 0x500000L);
                while (readEntries.hasMoreElements()) {
                    entry2 = (LedgerEntry)readEntries.nextElement();
                    Assert.assertArrayEquals((byte[])data, (byte[])entry2.getEntry());
                    ++entries;
                }
                Assert.assertEquals((long)numEntries, (long)entries);
                entries = 0;
                readEntries = lh.batchReadEntries(0L, 0, 0x500000L);
                while (readEntries.hasMoreElements()) {
                    entry2 = (LedgerEntry)readEntries.nextElement();
                    Assert.assertArrayEquals((byte[])data, (byte[])entry2.getEntry());
                    ++entries;
                }
                Assert.assertEquals((long)numEntries, (long)entries);
                long entrySize = 40 + data.length;
                int headerSize = 36;
                entries = 0;
                int expectEntriesNum = 5;
                Enumeration readEntries2 = lh.batchReadEntries(0L, 0, (long)expectEntriesNum * entrySize + (long)headerSize + (long)(expectEntriesNum * 4));
                while (readEntries2.hasMoreElements()) {
                    entry = (LedgerEntry)readEntries2.nextElement();
                    Assert.assertArrayEquals((byte[])data, (byte[])entry.getEntry());
                    ++entries;
                }
                Assert.assertEquals((long)expectEntriesNum, (long)entries);
                entries = 0;
                readEntries2 = lh.batchReadEntries(0L, 20, (long)expectEntriesNum * entrySize + (long)headerSize + (long)(expectEntriesNum * 4));
                while (readEntries2.hasMoreElements()) {
                    entry = (LedgerEntry)readEntries2.nextElement();
                    Assert.assertArrayEquals((byte[])data, (byte[])entry.getEntry());
                    ++entries;
                }
                Assert.assertEquals((long)expectEntriesNum, (long)entries);
            }
            finally {
                if (lh != null) {
                    lh.close();
                }
            }
        }
    }

    @Test
    public void testReadEntryReleaseByteBufs() throws Exception {
        long ledgerId;
        ClientConfiguration confWriter = new ClientConfiguration();
        confWriter.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        int numEntries = 10;
        byte[] data = "foobar".getBytes();
        try (BookKeeper bkc = new BookKeeper(confWriter);
             LedgerHandle lh = bkc.createLedger(this.digestType, "testPasswd".getBytes());){
            ledgerId = lh.getId();
            for (int i = 0; i < numEntries; ++i) {
                lh.addEntry(data);
            }
        }
        ClientConfiguration confReader1 = (ClientConfiguration)new ClientConfiguration().setUseV2WireProtocol(true).setNettyUsePooledBuffers(true).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        try (BookKeeper bkc = new BookKeeper(confReader1);
             LedgerHandle lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());){
            Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
            Enumeration readEntries = lh.readEntries(0L, (long)(numEntries - 1));
            while (readEntries.hasMoreElements()) {
                LedgerEntry entry = (LedgerEntry)readEntries.nextElement();
                try {
                    entry.data.release();
                }
                catch (IllegalReferenceCountException ok) {
                    Assert.fail((String)"ByteBuf already released");
                }
            }
        }
        ClientConfiguration confReader2 = new ClientConfiguration().setUseV2WireProtocol(true).setNettyUsePooledBuffers(false);
        confReader2.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        try (BookKeeper bkc = new BookKeeper(confReader2);
             LedgerHandle lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());){
            Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
            Enumeration readEntries = lh.readEntries(0L, (long)(numEntries - 1));
            while (readEntries.hasMoreElements()) {
                LedgerEntry entry = (LedgerEntry)readEntries.nextElement();
                try {
                    entry.data.release();
                }
                catch (IllegalReferenceCountException e) {
                    Assert.fail((String)"ByteBuf already released");
                }
            }
        }
        ClientConfiguration confReader3 = (ClientConfiguration)new ClientConfiguration().setUseV2WireProtocol(false).setNettyUsePooledBuffers(false).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        try (BookKeeper bkc = new BookKeeper(confReader3);
             LedgerHandle lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());){
            Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
            Enumeration readEntries = lh.readEntries(0L, (long)(numEntries - 1));
            while (readEntries.hasMoreElements()) {
                LedgerEntry entry = (LedgerEntry)readEntries.nextElement();
                Assert.assertTrue((String)("Can't release entry " + entry.getEntryId() + ": ref = " + entry.data.refCnt()), (boolean)entry.data.release());
                try {
                    Assert.assertFalse((boolean)entry.data.release());
                    Assert.fail((String)"ByteBuf already released");
                }
                catch (IllegalReferenceCountException illegalReferenceCountException) {}
            }
        }
        ClientConfiguration confReader4 = (ClientConfiguration)new ClientConfiguration().setUseV2WireProtocol(false).setNettyUsePooledBuffers(true).setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        try (BookKeeper bkc = new BookKeeper(confReader4);
             LedgerHandle lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());){
            Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
            Enumeration readEntries = lh.readEntries(0L, (long)(numEntries - 1));
            while (readEntries.hasMoreElements()) {
                LedgerEntry entry = (LedgerEntry)readEntries.nextElement();
                Assert.assertTrue((String)("Can't release entry " + entry.getEntryId() + ": ref = " + entry.data.refCnt()), (boolean)entry.data.release());
                try {
                    Assert.assertFalse((boolean)entry.data.release());
                    Assert.fail((String)"ByteBuf already released");
                }
                catch (IllegalReferenceCountException illegalReferenceCountException) {}
            }
        }
        ClientConfiguration confReader5 = new ClientConfiguration();
        confReader5.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        try (BookKeeper bkc = new BookKeeper(confReader5);
             LedgerHandle lh = bkc.openLedger(ledgerId, this.digestType, "testPasswd".getBytes());){
            Assert.assertEquals((long)(numEntries - 1), (long)lh.readLastConfirmed());
            Enumeration readEntries = lh.readEntries(0L, (long)(numEntries - 1));
            while (readEntries.hasMoreElements()) {
                LedgerEntry entry = (LedgerEntry)readEntries.nextElement();
                entry.getEntry();
                try {
                    entry.getEntry();
                    Assert.fail((String)"entry data accessed twice");
                }
                catch (IllegalStateException illegalStateException) {
                    // empty catch block
                }
                try {
                    entry.getEntryInputStream();
                    Assert.fail((String)"entry data accessed twice");
                }
                catch (IllegalStateException illegalStateException) {}
            }
        }
    }

    @Test
    public void testDoubleRead() throws Exception {
        LedgerHandle lh = this.bkc.createLedger(this.digestType, "".getBytes());
        lh.addEntry("test".getBytes());
        int n = 10;
        final CountDownLatch latch = new CountDownLatch(10);
        for (int i = 0; i < 10; ++i) {
            lh.asyncReadEntries(0L, 0L, new AsyncCallback.ReadCallback(){

                public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                    if (rc == 0) {
                        latch.countDown();
                    } else {
                        Assert.fail((String)"Read fail");
                    }
                }
            }, null);
        }
        latch.await();
    }

    @Test
    public void testDoubleReadWithV2Protocol() throws Exception {
        ClientConfiguration conf = new ClientConfiguration((AbstractConfiguration)this.baseClientConf);
        conf.setUseV2WireProtocol(true);
        BookKeeperTestClient bkc = new BookKeeperTestClient(conf);
        LedgerHandle lh = bkc.createLedger(this.digestType, "".getBytes());
        lh.addEntry("test".getBytes());
        int n = 10;
        final CountDownLatch latch = new CountDownLatch(10);
        for (int i = 0; i < 10; ++i) {
            lh.asyncReadEntries(0L, 0L, new AsyncCallback.ReadCallback(){

                public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                    if (rc == 0) {
                        latch.countDown();
                    } else {
                        Assert.fail((String)"Read fail");
                    }
                }
            }, null);
        }
        latch.await();
        bkc.close();
    }

    @Test(expected=BKException.BKIllegalOpException.class)
    public void testCannotUseWriteFlagsOnV2Protocol() throws Exception {
        ClientConfiguration conf = new ClientConfiguration((AbstractConfiguration)this.baseClientConf);
        conf.setUseV2WireProtocol(true);
        try (BookKeeperTestClient bkc = new BookKeeperTestClient(conf);
             WriteHandle wh = (WriteHandle)FutureUtils.result((CompletableFuture)bkc.newCreateLedgerOp().withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2).withPassword("".getBytes()).withWriteFlags(new WriteFlag[]{WriteFlag.DEFERRED_SYNC}).execute());){
            FutureUtils.result((CompletableFuture)wh.appendAsync("test".getBytes()));
        }
    }

    @Test(expected=BKException.BKIllegalOpException.class)
    public void testCannotUseForceOnV2Protocol() throws Exception {
        ClientConfiguration conf = new ClientConfiguration((AbstractConfiguration)this.baseClientConf);
        conf.setUseV2WireProtocol(true);
        try (BookKeeperTestClient bkc = new BookKeeperTestClient(conf);
             WriteHandle wh = (WriteHandle)FutureUtils.result((CompletableFuture)bkc.newCreateLedgerOp().withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2).withPassword("".getBytes()).withWriteFlags(WriteFlag.NONE).execute());){
            FutureUtils.result((CompletableFuture)wh.appendAsync("".getBytes()));
            FutureUtils.result((CompletableFuture)wh.force());
        }
    }

    @Test
    public void testZKConnectionLossForLedgerCreation() throws Exception {
        int zkSessionTimeOut = 10000;
        AtomicLong ledgerIdToInjectFailure = new AtomicLong(-1L);
        ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, (StatsLogger)NullStatsLogger.INSTANCE);
        MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(this.zkUtil.getZooKeeperConnectString(), zkSessionTimeOut, zooKeeperWatcherBase, ledgerIdToInjectFailure);
        zkFaultInjectionWrapper.waitForConnection();
        Assert.assertEquals((String)"zkFaultInjectionWrapper should be in connected state", (Object)ZooKeeper.States.CONNECTED, (Object)zkFaultInjectionWrapper.getState());
        BookKeeper bk = new BookKeeper(this.baseClientConf, (ZooKeeper)zkFaultInjectionWrapper);
        long oldZkInstanceSessionId = zkFaultInjectionWrapper.getSessionId();
        long ledgerId = 567L;
        LedgerHandle lh = bk.createLedgerAdv(ledgerId, 1, 1, 1, BookKeeper.DigestType.CRC32, "".getBytes(), null);
        lh.close();
        zooKeeperWatcherBase.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, ""));
        zkFaultInjectionWrapper.waitForConnection();
        for (int i = 0; i < 10 && zkFaultInjectionWrapper.getState() != ZooKeeper.States.CONNECTED; ++i) {
            Thread.sleep(200L);
        }
        Assert.assertEquals((String)"zkFaultInjectionWrapper should be in connected state", (Object)ZooKeeper.States.CONNECTED, (Object)zkFaultInjectionWrapper.getState());
        Assert.assertNotEquals((String)"Session Id of old and new ZK instance should be different", (long)oldZkInstanceSessionId, (long)zkFaultInjectionWrapper.getSessionId());
        ledgerIdToInjectFailure.set(++ledgerId);
        lh = bk.createLedgerAdv(ledgerId, 1, 1, 1, BookKeeper.DigestType.CRC32, "".getBytes(), null);
        lh.close();
        Assert.assertEquals((String)"injectZnodeCreationNoNodeFailure should have been reset it to INVALID_LEDGERID", (long)-1L, (long)ledgerIdToInjectFailure.get());
        lh = bk.openLedger(ledgerId, BookKeeper.DigestType.CRC32, "".getBytes());
        lh.close();
        lh = bk.createLedgerAdv(++ledgerId, 1, 1, 1, BookKeeper.DigestType.CRC32, "".getBytes(), null);
        lh.close();
        bk.close();
    }

    @Test
    public void testLedgerDeletionIdempotency() throws Exception {
        BookKeeper bk = new BookKeeper(this.baseClientConf);
        long ledgerId = 789L;
        LedgerHandle lh = bk.createLedgerAdv(ledgerId, 1, 1, 1, BookKeeper.DigestType.CRC32, "".getBytes(), null);
        lh.close();
        bk.deleteLedger(ledgerId);
        bk.deleteLedger(ledgerId);
        bk.close();
    }

    @Test
    public void testEnforceMinNumFaultDomainsForWrite() throws Exception {
        byte[] data = "foobar".getBytes();
        byte[] password = "testPasswd".getBytes();
        this.startNewBookie();
        this.startNewBookie();
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        conf.setEnsemblePlacementPolicy(MockRackawareEnsemblePlacementPolicy.class);
        conf.setAddEntryTimeout(2);
        conf.setAddEntryQuorumTimeout(4);
        conf.setEnforceMinNumFaultDomainsForWrite(true);
        TestStatsProvider statsProvider = new TestStatsProvider();
        BookKeeperTestClient bk = new BookKeeperTestClient(conf, statsProvider);
        StatsLogger statsLogger = bk.getStatsLogger();
        int ensembleSize = 3;
        int writeQuorumSize = 3;
        int ackQuorumSize = 2;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MockRackawareEnsemblePlacementPolicy currPlacementPolicy = (MockRackawareEnsemblePlacementPolicy)bk.getPlacementPolicy();
        currPlacementPolicy.setConditionFirstInvocationLatch(countDownLatch);
        currPlacementPolicy.setWriteQuorumSizeToUseForTesting(writeQuorumSize);
        try (LedgerHandle lh = bk.createLedger(ensembleSize, writeQuorumSize, ackQuorumSize, this.digestType, password);){
            CountDownLatch sleepLatchCase1 = new CountDownLatch(1);
            CountDownLatch sleepLatchCase2 = new CountDownLatch(1);
            LOG.info("Putting all non ensemble bookies to sleep.");
            for (BookieId addr : this.bookieAddresses()) {
                try {
                    if (lh.getCurrentEnsemble().contains(addr)) continue;
                    this.sleepBookie(addr, sleepLatchCase2);
                }
                catch (UnknownHostException unknownHostException) {}
            }
            Thread writeToLedger = new Thread(() -> {
                try {
                    LOG.info("Initiating write for entry");
                    long entryId = lh.addEntry(data);
                    LOG.info("Wrote entry with entryId = {}", (Object)entryId);
                }
                catch (InterruptedException | BKException throwable) {
                    // empty catch block
                }
            });
            BookieId bookieToSleep = (BookieId)lh.getCurrentEnsemble().get(0);
            LOG.info("Putting picked bookie to sleep");
            this.sleepBookie(bookieToSleep, sleepLatchCase1);
            Assert.assertEquals((long)statsLogger.getCounter("WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS").get(), (long)0L);
            writeToLedger.start();
            countDownLatch.await(conf.getAddEntryTimeout(), TimeUnit.SECONDS);
            Assert.assertEquals((String)"Write succeeded but should not have", (long)-1L, (long)lh.lastAddConfirmed);
            sleepLatchCase1.countDown();
            writeToLedger.join(conf.getAddEntryTimeout() * 1000);
            Assert.assertEquals((String)"Write did not succeed but should have", (long)0L, (long)lh.lastAddConfirmed);
            Assert.assertEquals((long)statsLogger.getCounter("WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS").get(), (long)1L);
            Thread writeToLedger2 = new Thread(() -> {
                try {
                    LOG.info("Initiating write for entry");
                    long entryId = lh.addEntry(data);
                    LOG.info("Wrote entry with entryId = {}", (Object)entryId);
                }
                catch (InterruptedException | BKException throwable) {
                    // empty catch block
                }
            });
            bookieToSleep = (BookieId)lh.getCurrentEnsemble().get(1);
            LOG.info("Putting picked bookie to sleep");
            this.sleepBookie(bookieToSleep, sleepLatchCase2);
            writeToLedger2.start();
            writeToLedger2.join((conf.getAddEntryQuorumTimeout() + 2) * 1000);
            Assert.assertEquals((String)"Write succeeded but should not have", (long)0L, (long)lh.lastAddConfirmed);
            sleepLatchCase2.countDown();
            Assert.assertEquals((long)statsLogger.getCounter("WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS").get(), (long)2L);
            Assert.assertEquals((long)statsLogger.getCounter("WRITE_TIME_OUT_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS").get(), (long)1L);
        }
    }

    @Test
    public void testBookieAddressResolverPassedToDNSToSwitchMapping() throws Exception {
        ClientConfiguration conf = new ClientConfiguration();
        conf.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        StaticDNSResolver tested = new StaticDNSResolver();
        try (BookKeeper bkc = BookKeeper.forConfig((ClientConfiguration)conf).dnsResolver((DNSToSwitchMapping)tested).build();){
            bkc.createLedger(this.digestType, "testPasswd".getBytes()).close();
            Assert.assertSame((Object)bkc.getBookieAddressResolver(), (Object)tested.getBookieAddressResolver());
        }
    }

    public static class MockRackawareEnsemblePlacementPolicy
    extends RackawareEnsemblePlacementPolicy {
        private int writeQuorumSizeToUseForTesting;
        private CountDownLatch conditionFirstInvocationLatch;

        void setWriteQuorumSizeToUseForTesting(int writeQuorumSizeToUseForTesting) {
            this.writeQuorumSizeToUseForTesting = writeQuorumSizeToUseForTesting;
        }

        void setConditionFirstInvocationLatch(CountDownLatch conditionFirstInvocationLatch) {
            this.conditionFirstInvocationLatch = conditionFirstInvocationLatch;
        }

        public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBookies, int writeQuorumSize, int ackQuorumSize) {
            this.conditionFirstInvocationLatch.countDown();
            return ackedBookies.size() == this.writeQuorumSizeToUseForTesting;
        }
    }

    class MockZooKeeperClient
    extends ZooKeeperClient {
        private final String connectString;
        private final int sessionTimeoutMs;
        private final ZooKeeperWatcherBase watcherManager;
        private final AtomicLong ledgerIdToInjectFailure;

        MockZooKeeperClient(String connectString, int sessionTimeoutMs, ZooKeeperWatcherBase watcher, AtomicLong ledgerIdToInjectFailure) throws IOException {
            super(connectString, sessionTimeoutMs, watcher, (RetryPolicy)new BoundExponentialBackoffRetryPolicy((long)sessionTimeoutMs, (long)sessionTimeoutMs, Integer.MAX_VALUE), (RetryPolicy)new BoundExponentialBackoffRetryPolicy((long)sessionTimeoutMs, (long)sessionTimeoutMs, 3), (StatsLogger)NullStatsLogger.INSTANCE, 1, 0.0, false);
            this.connectString = connectString;
            this.sessionTimeoutMs = sessionTimeoutMs;
            this.watcherManager = watcher;
            this.ledgerIdToInjectFailure = ledgerIdToInjectFailure;
        }

        protected ZooKeeper createZooKeeper() throws IOException {
            return new MockZooKeeper(this.connectString, this.sessionTimeoutMs, (Watcher)this.watcherManager, false);
        }

        class MockZooKeeper
        extends ZooKeeper {
            public MockZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
                super(connectString, sessionTimeout, watcher, canBeReadOnly);
            }

            public void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, final AsyncCallback.StringCallback cb, Object ctx) {
                AsyncCallback.StringCallback injectedCallback = new AsyncCallback.StringCallback(){

                    public void processResult(int rc, String path, Object ctx, String name) {
                        if (path.contains(MockZooKeeperClient.this.ledgerIdToInjectFailure.toString())) {
                            MockZooKeeperClient.this.ledgerIdToInjectFailure.set(-1L);
                            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, name);
                        } else {
                            cb.processResult(rc, path, ctx, name);
                        }
                    }
                };
                super.create(path, data, acl, createMode, injectedCallback, ctx);
            }
        }
    }
}

