/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.ReadLastConfirmedOp;
import org.apache.bookkeeper.client.ReadOnlyLedgerHandle;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelLedgerRecoveryTest
extends BookKeeperClusterTestCase {
    static final Logger LOG = LoggerFactory.getLogger(ParallelLedgerRecoveryTest.class);
    final BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;

    public ParallelLedgerRecoveryTest() throws Exception {
        super(3);
    }

    @Override
    protected void startBKCluster(String metadataServiceUri) throws Exception {
        MetadataDrivers.registerClientDriver((String)"zk", TestMetadataClientDriver.class, (boolean)true);
        MetadataDrivers.registerBookieDriver((String)"zk", TestMetadataBookieDriver.class, (boolean)true);
        this.baseConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class);
        this.baseClientConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class);
        this.baseClientConf.setReadEntryTimeout(60000);
        this.baseClientConf.setAddEntryTimeout(60000);
        super.startBKCluster(metadataServiceUri);
    }

    @Override
    @After
    public void tearDown() throws Exception {
        try {
            super.tearDown();
        }
        finally {
            MetadataDrivers.registerClientDriver((String)"zk", ZKMetadataClientDriver.class, (boolean)true);
            MetadataDrivers.registerBookieDriver((String)"zk", ZKMetadataBookieDriver.class, (boolean)true);
        }
    }

    @Test
    public void testRecoverBeforeWriteMetadata1() throws Exception {
        this.rereadDuringRecovery(true, 1, false, false);
    }

    @Test
    public void testRecoverBeforeWriteMetadata2() throws Exception {
        this.rereadDuringRecovery(true, 3, false, false);
    }

    @Test
    public void testRecoverBeforeWriteMetadata3() throws Exception {
        this.rereadDuringRecovery(false, 1, false, false);
    }

    @Test
    public void testRecoverBeforeWriteMetadata4() throws Exception {
        this.rereadDuringRecovery(false, 3, false, false);
    }

    @Test
    public void testRereadDuringRecovery1() throws Exception {
        this.rereadDuringRecovery(true, 1, true, false);
    }

    @Test
    public void testRereadDuringRecovery2() throws Exception {
        this.rereadDuringRecovery(true, 3, true, false);
    }

    @Test
    public void testRereadDuringRecovery3() throws Exception {
        this.rereadDuringRecovery(false, 1, true, false);
    }

    @Test
    public void testRereadDuringRecovery4() throws Exception {
        this.rereadDuringRecovery(false, 3, true, false);
    }

    @Test
    public void testConcurrentRecovery1() throws Exception {
        this.rereadDuringRecovery(true, 1, true, false);
    }

    @Test
    public void testConcurrentRecovery2() throws Exception {
        this.rereadDuringRecovery(true, 3, true, false);
    }

    @Test
    public void testConcurrentRecovery3() throws Exception {
        this.rereadDuringRecovery(false, 1, true, false);
    }

    @Test
    public void testConcurrentRecovery4() throws Exception {
        this.rereadDuringRecovery(false, 3, true, false);
    }

    private void rereadDuringRecovery(boolean parallelRead, int batchSize, boolean updateMetadata, boolean close) throws Exception {
        ClientConfiguration newConf = new ClientConfiguration();
        newConf.addConfiguration((Configuration)this.baseClientConf);
        newConf.setEnableParallelRecoveryRead(parallelRead);
        newConf.setRecoveryReadBatchSize(batchSize);
        BookKeeper newBk = new BookKeeper(newConf);
        TestLedgerManager tlm = (TestLedgerManager)newBk.getUnderlyingLedgerManager();
        final LedgerHandle lh = newBk.createLedger(this.numBookies, 2, 2, this.digestType, "".getBytes());
        CountDownLatch latch1 = new CountDownLatch(1);
        CountDownLatch latch2 = new CountDownLatch(1);
        this.sleepBookie((BookieSocketAddress)lh.getCurrentEnsemble().get(0), latch1);
        this.sleepBookie((BookieSocketAddress)lh.getCurrentEnsemble().get(1), latch2);
        int numEntries = this.numBookies * 3 + 1;
        final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
        final CountDownLatch addDone = new CountDownLatch(1);
        for (int i = 0; i < numEntries; ++i) {
            lh.asyncAddEntry(("" + i).getBytes(), new AsyncCallback.AddCallback(){

                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
                    if (0 != rc) {
                        addDone.countDown();
                        return;
                    }
                    if (numPendingAdds.decrementAndGet() == 0) {
                        addDone.countDown();
                    }
                }
            }, null);
        }
        latch1.countDown();
        latch2.countDown();
        addDone.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)numPendingAdds.get());
        LOG.info("Added {} entries to ledger {}.", (Object)numEntries, (Object)lh.getId());
        long ledgerLenth = lh.getLength();
        LedgerHandle recoverLh = newBk.openLedgerNoRecovery(lh.getId(), this.digestType, "".getBytes());
        Assert.assertEquals((long)-1L, (long)recoverLh.getLastAddPushed());
        Assert.assertEquals((long)-1L, (long)recoverLh.getLastAddConfirmed());
        Assert.assertEquals((long)0L, (long)recoverLh.getLength());
        LOG.info("OpenLedgerNoRecovery {}.", (Object)lh.getId());
        CountDownLatch metadataLatch = new CountDownLatch(1);
        tlm.setLatch(metadataLatch);
        final CountDownLatch recoverLatch = new CountDownLatch(1);
        final AtomicBoolean success = new AtomicBoolean(false);
        ((ReadOnlyLedgerHandle)recoverLh).recover((BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<Void>(){

            public void operationComplete(int rc, Void result) {
                LOG.info("Recovering ledger {} completed : {}.", (Object)lh.getId(), (Object)rc);
                success.set(0 == rc);
                recoverLatch.countDown();
            }
        });
        tlm.setLatch(null);
        if (updateMetadata) {
            LedgerHandle newRecoverLh;
            if (close) {
                LOG.info("OpenLedger {} to close.", (Object)lh.getId());
                newRecoverLh = newBk.openLedger(lh.getId(), this.digestType, "".getBytes());
                newRecoverLh.close();
            } else {
                LOG.info("OpenLedgerNoRecovery {} again.", (Object)lh.getId());
                newRecoverLh = newBk.openLedgerNoRecovery(lh.getId(), this.digestType, "".getBytes());
                Assert.assertEquals((long)-1L, (long)newRecoverLh.getLastAddPushed());
                Assert.assertEquals((long)-1L, (long)newRecoverLh.getLastAddConfirmed());
                ClientUtil.transformMetadata(newBk.getClientCtx(), newRecoverLh.getId(), metadata -> LedgerMetadataBuilder.from((LedgerMetadata)metadata).withInRecoveryState().build());
                newRecoverLh.close();
                LOG.info("Updated ledger manager {}.", (Object)newRecoverLh.getLedgerMetadata());
            }
        }
        metadataLatch.countDown();
        LOG.info("Resume metadata update.");
        recoverLatch.await(20L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)success.get());
        Assert.assertEquals((long)(numEntries - 1), (long)recoverLh.getLastAddPushed());
        Assert.assertEquals((long)(numEntries - 1), (long)recoverLh.getLastAddConfirmed());
        Assert.assertEquals((long)ledgerLenth, (long)recoverLh.getLength());
        Assert.assertTrue((boolean)recoverLh.getLedgerMetadata().isClosed());
        Enumeration enumeration = recoverLh.readEntries(0L, (long)(numEntries - 1));
        int numReads = 0;
        while (enumeration.hasMoreElements()) {
            LedgerEntry entry = (LedgerEntry)enumeration.nextElement();
            Assert.assertEquals((long)numReads, (long)entry.getEntryId());
            Assert.assertEquals((long)numReads, (long)Integer.parseInt(new String(entry.getEntry())));
            ++numReads;
        }
        Assert.assertEquals((long)numEntries, (long)numReads);
        recoverLh.close();
        newBk.close();
    }

    @Test
    public void testRecoveryOnEntryGap() throws Exception {
        byte[] passwd = "recovery-on-entry-gap".getBytes(StandardCharsets.UTF_8);
        LedgerHandle lh = this.bkc.createLedger(1, 1, 1, BookKeeper.DigestType.CRC32, passwd);
        for (int i = 0; i < 10; ++i) {
            lh.addEntry(("recovery-on-entry-gap-" + i).getBytes(StandardCharsets.UTF_8));
        }
        long entryId = 14L;
        long lac = 8L;
        byte[] data = "recovery-on-entry-gap-gap".getBytes(StandardCharsets.UTF_8);
        ByteBufList toSend = lh.macManager.computeDigestAndPackageForSending(entryId, lac, lh.getLength() + 100L, Unpooled.wrappedBuffer((byte[])data, (int)0, (int)data.length));
        final CountDownLatch addLatch = new CountDownLatch(1);
        final AtomicBoolean addSuccess = new AtomicBoolean(false);
        LOG.info("Add entry {} with lac = {}", (Object)entryId, (Object)lac);
        this.bkc.getBookieClient().addEntry((BookieSocketAddress)lh.getCurrentEnsemble().get(0), lh.getId(), lh.ledgerKey, entryId, toSend, new BookkeeperInternalCallbacks.WriteCallback(){

            public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
                addSuccess.set(0 == rc);
                addLatch.countDown();
            }
        }, (Object)0, 0, false, WriteFlag.NONE);
        addLatch.await();
        Assert.assertTrue((String)"add entry 14 should succeed", (boolean)addSuccess.get());
        ClientConfiguration newConf = new ClientConfiguration();
        newConf.addConfiguration((Configuration)this.baseClientConf);
        newConf.setEnableParallelRecoveryRead(true);
        newConf.setRecoveryReadBatchSize(10);
        BookKeeper newBk = new BookKeeper(newConf);
        final LedgerHandle recoverLh = newBk.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, passwd);
        Assert.assertEquals((String)"wrong lac found", (long)8L, (long)recoverLh.getLastAddConfirmed());
        final CountDownLatch recoverLatch = new CountDownLatch(1);
        final AtomicLong newLac = new AtomicLong(-1L);
        final AtomicBoolean isMetadataClosed = new AtomicBoolean(false);
        final AtomicInteger numSuccessCalls = new AtomicInteger(0);
        final AtomicInteger numFailureCalls = new AtomicInteger(0);
        ((ReadOnlyLedgerHandle)recoverLh).recover((BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<Void>(){

            public void operationComplete(int rc, Void result) {
                if (0 == rc) {
                    newLac.set(recoverLh.getLastAddConfirmed());
                    isMetadataClosed.set(recoverLh.getLedgerMetadata().isClosed());
                    numSuccessCalls.incrementAndGet();
                } else {
                    numFailureCalls.incrementAndGet();
                }
                recoverLatch.countDown();
            }
        });
        recoverLatch.await();
        Assert.assertEquals((String)"wrong lac found", (long)9L, (long)newLac.get());
        Assert.assertTrue((String)"metadata isn't closed after recovery", (boolean)isMetadataClosed.get());
        Thread.sleep(5000L);
        Assert.assertEquals((String)"recovery callback should be triggered only once", (long)1L, (long)numSuccessCalls.get());
        Assert.assertEquals((String)"recovery callback should be triggered only once", (long)0L, (long)numFailureCalls.get());
    }

    @Test
    public void testRecoveryWhenClosingLedgerHandle() throws Exception {
        byte[] passwd = "recovery-when-closing-ledger-handle".getBytes(StandardCharsets.UTF_8);
        ClientConfiguration newConf = new ClientConfiguration();
        newConf.addConfiguration((Configuration)this.baseClientConf);
        newConf.setEnableParallelRecoveryRead(true);
        newConf.setRecoveryReadBatchSize(1);
        newConf.setAddEntryTimeout(9999999);
        newConf.setReadEntryTimeout(9999999);
        BookKeeper newBk0 = new BookKeeper(newConf);
        LedgerHandle lh0 = newBk0.createLedger(1, 1, 1, this.digestType, passwd);
        BookKeeper newBk1 = new BookKeeper(newConf);
        final LedgerHandle lh1 = newBk1.openLedgerNoRecovery(lh0.getId(), this.digestType, passwd);
        TestLedgerManager tlm1 = (TestLedgerManager)newBk1.getUnderlyingLedgerManager();
        BookKeeper readBk = new BookKeeper(newConf);
        LedgerHandle readLh = readBk.openLedgerNoRecovery(lh0.getId(), this.digestType, passwd);
        LOG.info("Create ledger {}", (Object)lh0.getId());
        BookieSocketAddress address = (BookieSocketAddress)lh0.getCurrentEnsemble().get(0);
        ServerConfiguration conf = this.killBookie(address);
        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
        DelayResponseBookie fakeBookie = new DelayResponseBookie(conf);
        this.bs.add(this.startBookie(conf, fakeBookie));
        this.bsConfs.add(conf);
        lh0.addEntry("entry-0".getBytes(StandardCharsets.UTF_8));
        lh0.addEntry("entry-1".getBytes(StandardCharsets.UTF_8));
        long lac = readLh.readLastConfirmed();
        Assert.assertEquals((long)0L, (long)lac);
        lac = lh1.readLastConfirmed();
        Assert.assertEquals((long)0L, (long)lac);
        final CountDownLatch addLatch = new CountDownLatch(3);
        final AtomicInteger numAddFailures = new AtomicInteger(0);
        fakeBookie.delayAdd(true);
        for (int i = 2; i < 5; ++i) {
            lh0.asyncAddEntry(("entry-" + i).getBytes(StandardCharsets.UTF_8), new AsyncCallback.AddCallback(){

                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
                    if (0 != rc) {
                        numAddFailures.incrementAndGet();
                    }
                    addLatch.countDown();
                }
            }, null);
        }
        while (fakeBookie.delayQueue.size() < 3) {
            Thread.sleep(100L);
        }
        lac = readLh.readLastConfirmed();
        Assert.assertEquals((long)1L, (long)lac);
        lac = lh1.readLastConfirmed();
        Assert.assertEquals((long)1L, (long)lac);
        CountDownLatch readLatch = new CountDownLatch(1);
        fakeBookie.delayAdd(false);
        fakeBookie.delayRead(true, 3L, readLatch);
        CountDownLatch metadataLatch = new CountDownLatch(1);
        tlm1.setLatch(metadataLatch);
        final CountDownLatch recoverLatch = new CountDownLatch(1);
        final AtomicBoolean recoverSuccess = new AtomicBoolean(false);
        ((ReadOnlyLedgerHandle)lh1).recover((BookkeeperInternalCallbacks.GenericCallback)new BookkeeperInternalCallbacks.GenericCallback<Void>(){

            public void operationComplete(int rc, Void result) {
                LOG.info("Recovering ledger {} completed : {}", (Object)lh1.getId(), (Object)rc);
                recoverSuccess.set(0 == rc);
                recoverLatch.countDown();
            }
        });
        Thread.sleep(2000L);
        readLatch.countDown();
        lac = readLh.readLastConfirmed();
        Assert.assertEquals((long)1L, (long)lac);
        lh0.close();
        Assert.assertEquals((long)1L, (long)lh0.getLastAddConfirmed());
        metadataLatch.countDown();
        recoverLatch.await();
        Assert.assertTrue((boolean)recoverSuccess.get());
        Assert.assertEquals((long)1L, (long)lh1.getLastAddConfirmed());
        final AtomicLong lacHolder = new AtomicLong(-1234L);
        final AtomicInteger rcHolder = new AtomicInteger(-1234);
        final CountDownLatch doneLatch = new CountDownLatch(1);
        new ReadLastConfirmedOp(this.bkc.getBookieClient(), readLh.distributionSchedule, readLh.macManager, readLh.ledgerId, (List)readLh.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(), readLh.ledgerKey, new ReadLastConfirmedOp.LastConfirmedDataCallback(){

            public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
                rcHolder.set(rc);
                lacHolder.set(data.getLastAddConfirmed());
                doneLatch.countDown();
            }
        }).initiate();
        doneLatch.await();
        Assert.assertEquals((long)0L, (long)rcHolder.get());
        Assert.assertEquals((long)1L, (long)lacHolder.get());
        newBk0.close();
        newBk1.close();
        readBk.close();
    }

    @Test
    public void testRecoveryWithUnavailableBookie() throws Exception {
        byte[] passwd = "".getBytes(StandardCharsets.UTF_8);
        ClientConfiguration newConf = new ClientConfiguration();
        newConf.addConfiguration((Configuration)this.baseClientConf);
        BookKeeper readBk = new BookKeeper(newConf);
        BookKeeper newBk0 = new BookKeeper(newConf);
        int ensembleSize = 3;
        int writeQuorumSize = 3;
        int ackQuormSize = 2;
        LedgerHandle lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize, ackQuormSize, BookKeeper.DigestType.DUMMY, passwd);
        LedgerHandle readLh = readBk.openLedgerNoRecovery(lh0.getId(), BookKeeper.DigestType.DUMMY, passwd);
        int responseCode = this.readLACFromQuorum(readLh, -8, 0, -7);
        Assert.assertEquals((long)responseCode, (long)0L);
        responseCode = this.readLACFromQuorum(readLh, -8, 0, -8);
        Assert.assertEquals((long)responseCode, (long)-8L);
        ensembleSize = 2;
        writeQuorumSize = 2;
        ackQuormSize = 2;
        lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize, ackQuormSize, BookKeeper.DigestType.DUMMY, passwd);
        readLh = readBk.openLedgerNoRecovery(lh0.getId(), BookKeeper.DigestType.DUMMY, passwd);
        responseCode = this.readLACFromQuorum(readLh, -8, 0);
        Assert.assertEquals((long)responseCode, (long)0L);
        responseCode = this.readLACFromQuorum(readLh, -7, 0);
        Assert.assertEquals((long)responseCode, (long)0L);
        responseCode = this.readLACFromQuorum(readLh, -8, -8);
        Assert.assertEquals((long)responseCode, (long)-8L);
        newBk0.close();
        readBk.close();
    }

    private int readLACFromQuorum(LedgerHandle ledger, int ... bookieLACResponse) throws Exception {
        final MutableInt responseCode = new MutableInt(100);
        final CountDownLatch responseLatch = new CountDownLatch(1);
        ReadLastConfirmedOp readLCOp = new ReadLastConfirmedOp(this.bkc.getBookieClient(), ledger.getDistributionSchedule(), ledger.getDigestManager(), ledger.getId(), (List)ledger.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(), ledger.getLedgerKey(), new ReadLastConfirmedOp.LastConfirmedDataCallback(){

            public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
                System.out.println("response = " + rc);
                responseCode.setValue(rc);
                responseLatch.countDown();
            }
        });
        byte[] lac = new byte[192];
        ByteBuf data = Unpooled.wrappedBuffer((byte[])lac, (int)0, (int)lac.length);
        int writerIndex = data.writerIndex();
        data.resetWriterIndex();
        data.writeLong(ledger.getId());
        data.writeLong(0L);
        data.writerIndex(writerIndex);
        for (int i = 0; i < bookieLACResponse.length; ++i) {
            readLCOp.readEntryComplete(bookieLACResponse[i], 0L, 0L, data, (Object)i);
        }
        responseLatch.await();
        return responseCode.intValue();
    }

    static class DelayResponseBookie
    extends Bookie {
        private final AtomicBoolean delayAddResponse = new AtomicBoolean(false);
        private final AtomicBoolean delayReadResponse = new AtomicBoolean(false);
        private final AtomicLong delayReadOnEntry = new AtomicLong(-1234L);
        private volatile CountDownLatch delayReadLatch = null;
        private final LinkedBlockingQueue<WriteCallbackEntry> delayQueue = new LinkedBlockingQueue();

        public DelayResponseBookie(ServerConfiguration conf) throws IOException, KeeperException, InterruptedException, BookieException {
            super(conf);
        }

        public void addEntry(ByteBuf entry, boolean ackBeforeSync, final BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException {
            super.addEntry(entry, ackBeforeSync, new BookkeeperInternalCallbacks.WriteCallback(){

                public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
                    if (delayAddResponse.get()) {
                        delayQueue.add(new WriteCallbackEntry(cb, rc, ledgerId, entryId, addr, ctx));
                    } else {
                        cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
                    }
                }
            }, ctx, masterKey);
        }

        public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, Bookie.NoLedgerException {
            CountDownLatch latch;
            LOG.info("ReadEntry {} - {}", (Object)ledgerId, (Object)entryId);
            if (this.delayReadResponse.get() && this.delayReadOnEntry.get() == entryId && null != (latch = this.delayReadLatch)) {
                try {
                    latch.await();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return super.readEntry(ledgerId, entryId);
        }

        void delayAdd(boolean delayed) {
            this.delayAddResponse.set(delayed);
        }

        void delayRead(boolean delayed, long entryId, CountDownLatch delayReadLatch) {
            this.delayReadResponse.set(delayed);
            this.delayReadOnEntry.set(entryId);
            this.delayReadLatch = delayReadLatch;
        }

        static final class WriteCallbackEntry {
            private final BookkeeperInternalCallbacks.WriteCallback cb;
            private final int rc;
            private final long ledgerId;
            private final long entryId;
            private final BookieSocketAddress addr;
            private final Object ctx;

            WriteCallbackEntry(BookkeeperInternalCallbacks.WriteCallback cb, int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
                this.cb = cb;
                this.rc = rc;
                this.ledgerId = ledgerId;
                this.entryId = entryId;
                this.addr = addr;
                this.ctx = ctx;
            }

            public void callback() {
                this.cb.writeComplete(this.rc, this.ledgerId, this.entryId, this.addr, this.ctx);
            }
        }
    }

    static class TestMetadataBookieDriver
    extends ZKMetadataBookieDriver {
        TestMetadataBookieDriver() {
        }

        public synchronized LedgerManagerFactory getLedgerManagerFactory() throws MetadataException {
            if (null == this.lmFactory) {
                try {
                    this.lmFactory = new TestLedgerManagerFactory().initialize(this.conf, this.layoutManager, 1);
                }
                catch (IOException e) {
                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, (Throwable)e);
                }
            }
            return this.lmFactory;
        }
    }

    static class TestMetadataClientDriver
    extends ZKMetadataClientDriver {
        TestMetadataClientDriver() {
        }

        public synchronized LedgerManagerFactory getLedgerManagerFactory() throws MetadataException {
            if (null == this.lmFactory) {
                try {
                    this.lmFactory = new TestLedgerManagerFactory().initialize(this.conf, this.layoutManager, 1);
                }
                catch (IOException e) {
                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, (Throwable)e);
                }
            }
            return this.lmFactory;
        }
    }

    static class TestLedgerManagerFactory
    extends HierarchicalLedgerManagerFactory {
        TestLedgerManagerFactory() {
        }

        public LedgerManager newLedgerManager() {
            return new TestLedgerManager(super.newLedgerManager());
        }
    }

    static class TestLedgerManager
    implements LedgerManager {
        final LedgerManager lm;
        volatile CountDownLatch waitLatch = null;
        final ExecutorService executorService;

        TestLedgerManager(LedgerManager lm) {
            this.lm = lm;
            this.executorService = Executors.newSingleThreadExecutor();
        }

        void setLatch(CountDownLatch waitLatch) {
            this.waitLatch = waitLatch;
        }

        public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId, LedgerMetadata metadata) {
            return this.lm.createLedgerMetadata(ledgerId, metadata);
        }

        public CompletableFuture<Void> removeLedgerMetadata(long ledgerId, Version version) {
            return this.lm.removeLedgerMetadata(ledgerId, version);
        }

        public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
            return this.lm.readLedgerMetadata(ledgerId);
        }

        public LedgerManager.LedgerRangeIterator getLedgerRanges(long zkOpTimeoutMs) {
            return this.lm.getLedgerRanges(zkOpTimeoutMs);
        }

        public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata, final Version currentVersion) {
            final CountDownLatch cdl = this.waitLatch;
            if (null != cdl) {
                final CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<Versioned<LedgerMetadata>>();
                this.executorService.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            cdl.await();
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            LOG.error("Interrupted on waiting latch : ", (Throwable)e);
                        }
                        lm.writeLedgerMetadata(ledgerId, metadata, currentVersion).whenComplete((metadata, exception) -> {
                            if (exception != null) {
                                promise.completeExceptionally((Throwable)exception);
                            } else {
                                promise.complete(metadata);
                            }
                        });
                    }
                });
                return promise;
            }
            return this.lm.writeLedgerMetadata(ledgerId, metadata, currentVersion);
        }

        public void registerLedgerMetadataListener(long ledgerId, BookkeeperInternalCallbacks.LedgerMetadataListener listener) {
            this.lm.registerLedgerMetadataListener(ledgerId, listener);
        }

        public void unregisterLedgerMetadataListener(long ledgerId, BookkeeperInternalCallbacks.LedgerMetadataListener listener) {
            this.lm.unregisterLedgerMetadataListener(ledgerId, listener);
        }

        public void asyncProcessLedgers(BookkeeperInternalCallbacks.Processor<Long> processor, AsyncCallback.VoidCallback finalCb, Object context, int successRc, int failureRc) {
            this.lm.asyncProcessLedgers(processor, finalCb, context, successRc, failureRc);
        }

        public void close() throws IOException {
            this.lm.close();
            this.executorService.shutdown();
        }
    }
}

