/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.bookie;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.EntryMemTable;
import org.apache.bookkeeper.bookie.EntryMemTableWithParallelFlusher;
import org.apache.bookkeeper.bookie.FileInfoBackingCache;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.LedgerCache;
import org.apache.bookkeeper.bookie.LedgerCacheImpl;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.SkipListFlusher;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.SnapshotMap;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LedgerCacheTest {
    private static final Logger LOG = LoggerFactory.getLogger(LedgerCacheTest.class);
    SnapshotMap<Long, Boolean> activeLedgers;
    LedgerCache ledgerCache;
    Thread flushThread;
    ServerConfiguration conf;
    File txnDir;
    File ledgerDir;
    private final List<File> tempDirs = new ArrayList<File>();
    private Bookie bookie;

    @Before
    public void setUp() throws Exception {
        this.txnDir = IOUtils.createTempDir((String)"ledgercache", (String)"txn");
        this.ledgerDir = IOUtils.createTempDir((String)"ledgercache", (String)"ledger");
        new File(this.ledgerDir, "current").mkdir();
        this.conf = TestBKConfiguration.newServerConfiguration();
        this.conf.setMetadataServiceUri(null);
        this.conf.setJournalDirName(this.txnDir.getPath());
        this.conf.setLedgerDirNames(new String[]{this.ledgerDir.getPath()});
        this.bookie = new Bookie(this.conf);
        this.activeLedgers = new SnapshotMap();
        this.ledgerCache = ((InterleavedLedgerStorage)this.bookie.ledgerStorage.getUnderlyingLedgerStorage()).ledgerCache;
    }

    @After
    public void tearDown() throws Exception {
        if (this.flushThread != null) {
            this.flushThread.interrupt();
            this.flushThread.join();
        }
        this.bookie.ledgerStorage.shutdown();
        FileUtils.deleteDirectory((File)this.txnDir);
        FileUtils.deleteDirectory((File)this.ledgerDir);
        for (File dir : this.tempDirs) {
            FileUtils.deleteDirectory((File)dir);
        }
    }

    File createTempDir(String prefix, String suffix) throws IOException {
        File dir = IOUtils.createTempDir((String)prefix, (String)suffix);
        this.tempDirs.add(dir);
        return dir;
    }

    private void newLedgerCache() throws IOException {
        if (this.ledgerCache != null) {
            this.ledgerCache.close();
        }
        this.ledgerCache = ((InterleavedLedgerStorage)this.bookie.ledgerStorage.getUnderlyingLedgerStorage()).ledgerCache = new LedgerCacheImpl(this.conf, this.activeLedgers, this.bookie.getIndexDirsManager());
        this.flushThread = new Thread(){

            @Override
            public void run() {
                while (true) {
                    try {
                        while (true) {
                            1.sleep(LedgerCacheTest.this.conf.getFlushInterval());
                            LedgerCacheTest.this.ledgerCache.flushLedger(true);
                        }
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                    catch (Exception e) {
                        LOG.error("Exception in flush thread", (Throwable)e);
                        continue;
                    }
                    break;
                }
            }
        };
        this.flushThread.start();
    }

    @Test
    public void testAddEntryException() throws IOException {
        this.conf.setPageLimit(10);
        this.newLedgerCache();
        try {
            byte[] masterKey = "blah".getBytes();
            for (int i = 0; i < 100; ++i) {
                this.ledgerCache.setMasterKey((long)i, masterKey);
                this.ledgerCache.putEntryOffset((long)i, 0L, (long)(i * 8));
            }
        }
        catch (IOException e) {
            LOG.error("Got IOException.", (Throwable)e);
            Assert.fail((String)"Failed to add entry.");
        }
    }

    @Test
    public void testLedgerEviction() throws Exception {
        int numEntries = 10;
        this.conf.setOpenFileLimit(1).setPageLimit(2).setPageSize(8 * numEntries);
        this.newLedgerCache();
        try {
            int numLedgers = 3;
            byte[] masterKey = "blah".getBytes();
            for (int i = 1; i <= numLedgers; ++i) {
                this.ledgerCache.setMasterKey((long)i, masterKey);
                for (int j = 0; j < numEntries; ++j) {
                    this.ledgerCache.putEntryOffset((long)i, (long)j, (long)(i * numEntries + j));
                }
            }
        }
        catch (Exception e) {
            LOG.error("Got Exception.", (Throwable)e);
            Assert.fail((String)"Failed to add entry.");
        }
    }

    @Test
    public void testDeleteLedger() throws Exception {
        int numEntries = 10;
        this.conf.setOpenFileLimit(999).setPageLimit(2).setPageSize(8 * numEntries);
        this.newLedgerCache();
        try {
            int j;
            int i;
            int numLedgers = 2;
            byte[] masterKey = "blah".getBytes();
            for (i = 1; i <= numLedgers; ++i) {
                this.ledgerCache.setMasterKey((long)i, masterKey);
                for (j = 0; j < numEntries; ++j) {
                    this.ledgerCache.putEntryOffset((long)i, (long)j, (long)(i * numEntries + j));
                }
            }
            for (i = 1; i <= numLedgers; ++i) {
                this.ledgerCache.deleteLedger((long)i);
            }
            for (i = numLedgers + 1; i <= 2 * numLedgers; ++i) {
                this.ledgerCache.setMasterKey((long)i, masterKey);
                for (j = 0; j < numEntries; ++j) {
                    this.ledgerCache.putEntryOffset((long)i, (long)j, (long)(i * numEntries + j));
                }
            }
        }
        catch (Exception e) {
            LOG.error("Got Exception.", (Throwable)e);
            Assert.fail((String)"Failed to add entry.");
        }
    }

    @Test
    public void testPageEviction() throws Exception {
        int numLedgers = 10;
        byte[] masterKey = "blah".getBytes();
        this.conf.setOpenFileLimit(999999).setPageLimit(3);
        this.newLedgerCache();
        try {
            int i;
            for (i = 1; i <= numLedgers; ++i) {
                this.ledgerCache.setMasterKey((long)i, masterKey);
                this.ledgerCache.putEntryOffset((long)i, 0L, (long)(i * 8));
                this.ledgerCache.putEntryOffset((long)i, 1L, (long)(i * 8));
            }
            this.ledgerCache.flushLedger(true);
            this.ledgerCache.flushLedger(true);
            for (i = 1; i <= numLedgers / 2; ++i) {
                this.ledgerCache.deleteLedger((long)i);
            }
            this.newLedgerCache();
            for (i = 1; i <= numLedgers; ++i) {
                try {
                    this.ledgerCache.putEntryOffset((long)i, 1L, (long)(i * 8));
                    continue;
                }
                catch (Bookie.NoLedgerException nsle) {
                    if (i <= numLedgers / 2) continue;
                    LOG.error("Error put entry offset : ", (Throwable)nsle);
                    Assert.fail((String)"Should not reach here.");
                }
            }
        }
        catch (Exception e) {
            LOG.error("Got Exception.", (Throwable)e);
            Assert.fail((String)"Failed to add entry.");
        }
    }

    @Test
    public void testLedgerCacheFlushFailureOnDiskFull() throws Exception {
        File ledgerDir1 = this.createTempDir("bkTest", ".dir");
        File ledgerDir2 = this.createTempDir("bkTest", ".dir");
        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
        conf.setLedgerDirNames(new String[]{ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath()});
        Bookie bookie = new Bookie(conf);
        InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage)bookie.ledgerStorage.getUnderlyingLedgerStorage();
        LedgerCacheImpl ledgerCache = (LedgerCacheImpl)ledgerStorage.ledgerCache;
        ledgerStorage.setMasterKey(1L, "key".getBytes());
        FileInfoBackingCache.CachedFileInfo fileInfo = ledgerCache.getIndexPersistenceManager().getFileInfo(Long.valueOf(1L), null);
        ledgerStorage.addEntry(this.generateEntry(1L, 1L));
        ledgerStorage.addEntry(this.generateEntry(1L, 2L));
        ledgerStorage.flush();
        ledgerStorage.addEntry(this.generateEntry(1L, 3L));
        bookie.getIndexDirsManager().addToFilledDirs(fileInfo.getLf().getParentFile().getParentFile().getParentFile());
        File before = fileInfo.getLf();
        ledgerStorage.flush();
        File after = fileInfo.getLf();
        Assert.assertFalse((String)"After flush index file should be changed", (boolean)before.equals(after));
        Assert.assertEquals((Object)this.generateEntry(1L, 1L), (Object)ledgerStorage.getEntry(1L, 1L));
        Assert.assertEquals((Object)this.generateEntry(1L, 2L), (Object)ledgerStorage.getEntry(1L, 2L));
        Assert.assertEquals((Object)this.generateEntry(1L, 3L), (Object)ledgerStorage.getEntry(1L, 3L));
    }

    @Test
    public void testIndexPageEvictionWriteOrder() throws Exception {
        int i;
        int numLedgers = 10;
        File journalDir = this.createTempDir("bookie", "journal");
        Bookie.checkDirectoryStructure((File)Bookie.getCurrentDirectory((File)journalDir));
        File ledgerDir = this.createTempDir("bookie", "ledger");
        Bookie.checkDirectoryStructure((File)Bookie.getCurrentDirectory((File)ledgerDir));
        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
        conf.setMetadataServiceUri(null);
        conf.setJournalDirName(journalDir.getPath()).setLedgerDirNames(new String[]{ledgerDir.getPath()}).setFlushInterval(1000).setPageLimit(1).setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
        Bookie b = new Bookie(conf);
        b.start();
        for (i = 1; i <= 10; ++i) {
            ByteBuf packet = this.generateEntry(i, 1L);
            b.addEntry(packet, false, (BookkeeperInternalCallbacks.WriteCallback)new Bookie.NopWriteCallback(), null, "passwd".getBytes());
        }
        conf = TestBKConfiguration.newServerConfiguration();
        conf.setMetadataServiceUri(null);
        conf.setJournalDirName(journalDir.getPath()).setLedgerDirNames(new String[]{ledgerDir.getPath()});
        b = new Bookie(conf);
        for (i = 1; i <= 10; ++i) {
            try {
                b.readEntry((long)i, 1L);
                continue;
            }
            catch (Bookie.NoLedgerException nle) {
                Assert.assertEquals((String)"No ledger should only happen for the last ledger", (long)i, (long)10L);
                continue;
            }
            catch (Bookie.NoEntryException nle) {
                continue;
            }
            catch (IOException ioe) {
                LOG.info("Shouldn't have received IOException", (Throwable)ioe);
                Assert.fail((String)"Shouldn't throw IOException, should say that entry is not found");
            }
        }
    }

    @Test
    public void testSyncThreadNPE() throws IOException {
        this.newLedgerCache();
        try {
            ((LedgerCacheImpl)this.ledgerCache).getIndexPageManager().getLedgerEntryPageFromCache(0L, 0L, true);
        }
        catch (Exception e) {
            LOG.error("Exception when trying to get a ledger entry page", (Throwable)e);
            Assert.fail((String)"Shouldn't have thrown an exception");
        }
    }

    @Test
    public void testPutEntryOffsetDeleteRace() throws Exception {
        this.newLedgerCache();
        final AtomicInteger rc = new AtomicInteger(0);
        final LinkedBlockingQueue putQ = new LinkedBlockingQueue(100);
        final LinkedBlockingQueue deleteQ = new LinkedBlockingQueue(100);
        final byte[] masterKey = "masterKey".getBytes();
        long numLedgers = 1000L;
        int numPutters = 10;
        int numDeleters = 10;
        AtomicBoolean running = new AtomicBoolean(true);
        Thread newLedgerThread = new Thread(){

            @Override
            public void run() {
                try {
                    int i;
                    for (long i2 = 0L; i2 < 1000L && rc.get() == 0; ++i2) {
                        LedgerCacheTest.this.ledgerCache.setMasterKey(i2, masterKey);
                        LedgerCacheTest.this.ledgerCache.putEntryOffset(i2, 1L, 0L);
                        deleteQ.put(i2);
                        putQ.put(i2);
                    }
                    for (i = 0; i < 10; ++i) {
                        putQ.put(-1L);
                    }
                    for (i = 0; i < 10; ++i) {
                        deleteQ.put(-1L);
                    }
                }
                catch (Throwable e) {
                    rc.set(-1);
                    LOG.error("Exception in new ledger thread", e);
                }
            }
        };
        newLedgerThread.start();
        Thread[] flushThreads = new Thread[10];
        for (int i = 0; i < 10; ++i) {
            Thread flushThread = new Thread(){

                @Override
                public void run() {
                    block4: while (true) {
                        try {
                            long id;
                            while ((id = ((Long)putQ.take()).longValue()) != -1L) {
                                LOG.info("Putting {}", (Object)id);
                                try {
                                    LedgerCacheTest.this.ledgerCache.putEntryOffset(id, 2L, 0L);
                                    LedgerCacheTest.this.ledgerCache.deleteLedger(id);
                                    continue block4;
                                }
                                catch (Bookie.NoLedgerException noLedgerException) {
                                }
                            }
                            break;
                        }
                        catch (Throwable e) {
                            rc.set(-1);
                            LOG.error("Exception in put thread", e);
                            break;
                        }
                    }
                }
            };
            flushThread.start();
            flushThreads[i] = flushThread;
        }
        Thread[] deleteThreads = new Thread[10];
        for (int i = 0; i < 10; ++i) {
            Thread deleteThread = new Thread(){

                @Override
                public void run() {
                    block4: while (true) {
                        try {
                            long id;
                            while ((id = ((Long)deleteQ.take()).longValue()) != -1L) {
                                LOG.info("Deleting {}", (Object)id);
                                try {
                                    LedgerCacheTest.this.ledgerCache.deleteLedger(id);
                                    continue block4;
                                }
                                catch (Bookie.NoLedgerException noLedgerException) {
                                }
                            }
                            break;
                        }
                        catch (Throwable e) {
                            rc.set(-1);
                            LOG.error("Exception in delete thread", e);
                            break;
                        }
                    }
                }
            };
            deleteThread.start();
            deleteThreads[i] = deleteThread;
        }
        newLedgerThread.join();
        for (Thread deleteThread : deleteThreads) {
            deleteThread.join();
        }
        running.set(false);
        for (Thread flushThread : flushThreads) {
            flushThread.join();
        }
        Assert.assertEquals((String)"Should have been no errors", (long)rc.get(), (long)0L);
        for (long i = 0L; i < 1000L; ++i) {
            boolean gotError = false;
            try {
                LOG.error("Checking {}", (Object)i);
                this.ledgerCache.getEntryOffset(i, 0L);
            }
            catch (Bookie.NoLedgerException e) {
                gotError = true;
            }
            if (gotError) continue;
            LOG.error("Ledger {} is still around", (Object)i);
            Assert.fail((String)("Found ledger " + i + ", which should have been removed"));
        }
    }

    @Test
    public void testFlushDeleteRace() throws Exception {
        this.newLedgerCache();
        final AtomicInteger rc = new AtomicInteger(0);
        final LinkedBlockingQueue ledgerQ = new LinkedBlockingQueue(100);
        final byte[] masterKey = "masterKey".getBytes();
        long numLedgers = 1000L;
        int numFlushers = 10;
        int numDeleters = 10;
        final AtomicBoolean running = new AtomicBoolean(true);
        Thread newLedgerThread = new Thread(){

            @Override
            public void run() {
                try {
                    for (long i = 0L; i < 1000L && rc.get() == 0; ++i) {
                        LedgerCacheTest.this.ledgerCache.setMasterKey(i, masterKey);
                        LedgerCacheTest.this.ledgerCache.putEntryOffset(i, 1L, 0L);
                        ledgerQ.put(i);
                    }
                    for (int i = 0; i < 10; ++i) {
                        ledgerQ.put(-1L);
                    }
                }
                catch (Throwable e) {
                    rc.set(-1);
                    LOG.error("Exception in new ledger thread", e);
                }
            }
        };
        newLedgerThread.start();
        Thread[] flushThreads = new Thread[10];
        for (int i = 0; i < 10; ++i) {
            Thread flushThread = new Thread(){

                @Override
                public void run() {
                    try {
                        while (running.get()) {
                            LedgerCacheTest.this.ledgerCache.flushLedger(true);
                        }
                    }
                    catch (Throwable e) {
                        rc.set(-1);
                        LOG.error("Exception in flush thread", e);
                    }
                    LOG.error("Shutting down flush thread");
                }
            };
            flushThread.start();
            flushThreads[i] = flushThread;
        }
        Thread[] deleteThreads = new Thread[10];
        for (int i = 0; i < 10; ++i) {
            Thread deleteThread = new Thread(){

                @Override
                public void run() {
                    try {
                        long id;
                        while ((id = ((Long)ledgerQ.take()).longValue()) != -1L) {
                            LOG.info("Deleting {}", (Object)id);
                            LedgerCacheTest.this.ledgerCache.deleteLedger(id);
                        }
                    }
                    catch (Throwable e) {
                        rc.set(-1);
                        LOG.error("Exception in delete thread", e);
                    }
                }
            };
            deleteThread.start();
            deleteThreads[i] = deleteThread;
        }
        newLedgerThread.join();
        for (Thread deleteThread : deleteThreads) {
            deleteThread.join();
        }
        running.set(false);
        for (Thread flushThread : flushThreads) {
            flushThread.join();
        }
        Assert.assertEquals((String)"Should have been no errors", (long)rc.get(), (long)0L);
        for (long i = 0L; i < 1000L; ++i) {
            boolean gotError = false;
            try {
                LOG.error("Checking {}", (Object)i);
                this.ledgerCache.getEntryOffset(i, 0L);
            }
            catch (Bookie.NoLedgerException e) {
                gotError = true;
            }
            if (gotError) continue;
            LOG.error("Ledger {} is still around", (Object)i);
            Assert.fail((String)("Found ledger " + i + ", which should have been removed"));
        }
    }

    @Test
    public void testEntryMemTableFlushFailure() throws Exception {
        File tmpDir = this.createTempDir("bkTest", ".dir");
        File curDir = Bookie.getCurrentDirectory((File)tmpDir);
        Bookie.checkDirectoryStructure((File)curDir);
        int gcWaitTime = 1000;
        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
        conf.setGcWaitTime((long)gcWaitTime);
        conf.setLedgerDirNames(new String[]{tmpDir.toString()});
        conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
        Bookie bookie = new Bookie(conf);
        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage)bookie.ledgerStorage;
        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
        bookie.addEntry(this.generateEntry(1L, 1L), false, (BookkeeperInternalCallbacks.WriteCallback)new Bookie.NopWriteCallback(), null, "passwd".getBytes());
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 2L));
        Assert.assertFalse((String)"Bookie is expected to be in ReadWrite mode", (boolean)bookie.isReadOnly());
        Assert.assertTrue((String)"EntryMemTable SnapShot is expected to be empty", (boolean)memTable.snapshot.isEmpty());
        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
        flushTestSortedLedgerStorage.setInjectFlushException(true, -1L);
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 2L));
        Assert.assertFalse((String)"EntryMemTable SnapShot is not expected to be empty", (boolean)memTable.snapshot.isEmpty());
        Assert.assertEquals((String)"Flusher called", (long)1L, (long)flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(false);
        flushTestSortedLedgerStorage.setInjectFlushException(false, -1L);
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 3L));
        Assert.assertTrue((String)"EntryMemTable SnapShot is expected to be empty, because of successful flush", (boolean)memTable.snapshot.isEmpty());
    }

    @Test
    public void testSortedLedgerFlushFailure() throws Exception {
        File tmpDir = this.createTempDir("bkTest", ".dir");
        File curDir = Bookie.getCurrentDirectory((File)tmpDir);
        Bookie.checkDirectoryStructure((File)curDir);
        int gcWaitTime = 1000;
        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
        conf.setGcWaitTime((long)gcWaitTime).setLedgerDirNames(new String[]{tmpDir.toString()}).setJournalDirName(tmpDir.toString()).setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
        Bookie bookie = new Bookie(conf);
        bookie.start();
        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage)bookie.ledgerStorage;
        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
        bookie.addEntry(this.generateEntry(1L, 1L), false, (BookkeeperInternalCallbacks.WriteCallback)new Bookie.NopWriteCallback(), null, "passwd".getBytes());
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 2L));
        Assert.assertFalse((String)"Bookie is expected to be in ReadWrite mode", (boolean)bookie.isReadOnly());
        Assert.assertTrue((String)"EntryMemTable SnapShot is expected to be empty", (boolean)memTable.snapshot.isEmpty());
        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
        flushTestSortedLedgerStorage.setInjectFlushException(true, -1L);
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 2L));
        Assert.assertFalse((String)"EntryMemTable SnapShot is not expected to be empty", (boolean)memTable.snapshot.isEmpty());
        Assert.assertTrue((String)"Bookie is expected to be in Read mode", (boolean)bookie.isReadOnly());
        bookie.addEntry(this.generateEntry(1L, 3L), false, new BookkeeperInternalCallbacks.WriteCallback(){

            public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
                LOG.info("fail write to bk");
                Assert.assertTrue((rc != 0 ? 1 : 0) != 0);
            }
        }, null, "passwd".getBytes());
        bookie.shutdown();
    }

    private ByteBuf generateEntry(long ledger, long entry) {
        byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
        ByteBuf bb = Unpooled.buffer((int)(16 + data.length));
        bb.writeLong(ledger);
        bb.writeLong(entry);
        bb.writeBytes(data);
        return bb;
    }

    @Test
    public void testEntryMemTableParallelFlush() throws Exception {
        int gcWaitTime = 1000;
        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
        conf.setGcWaitTime((long)gcWaitTime);
        conf.setLedgerDirNames(this.createAndGetLedgerDirs(1));
        conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
        conf.setEntryLogPerLedgerEnabled(true);
        Bookie bookie = new Bookie(conf);
        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage)bookie.ledgerStorage;
        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
        bookie.addEntry(this.generateEntry(1L, 1L), false, (BookkeeperInternalCallbacks.WriteCallback)new Bookie.NopWriteCallback(), null, "passwd".getBytes());
        bookie.addEntry(this.generateEntry(2L, 1L), false, (BookkeeperInternalCallbacks.WriteCallback)new Bookie.NopWriteCallback(), null, "passwd".getBytes());
        bookie.addEntry(this.generateEntry(3L, 1L), false, (BookkeeperInternalCallbacks.WriteCallback)new Bookie.NopWriteCallback(), null, "passwd".getBytes());
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 2L));
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(2L, 2L));
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(3L, 2L));
        Assert.assertTrue((String)"EntryMemTable SnapShot is expected to be empty", (boolean)memTable.snapshot.isEmpty());
        Assert.assertFalse((String)"EntryMemTable is not expected to be empty", (boolean)memTable.isEmpty());
        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 3L));
        Assert.assertTrue((String)"EntryMemTable SnapShot is expected to be empty", (boolean)memTable.snapshot.isEmpty());
        Assert.assertEquals((String)"Flusher called", (long)1L, (long)flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
    }

    @Test
    public void testEntryMemTableParallelFlushWithFlushException() throws Exception {
        int gcWaitTime = 1000;
        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
        conf.setGcWaitTime((long)gcWaitTime);
        conf.setLedgerDirNames(this.createAndGetLedgerDirs(1));
        conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
        conf.setEntryLogPerLedgerEnabled(true);
        Bookie bookie = new Bookie(conf);
        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage)bookie.ledgerStorage;
        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
        bookie.addEntry(this.generateEntry(1L, 1L), false, (BookkeeperInternalCallbacks.WriteCallback)new Bookie.NopWriteCallback(), null, "passwd".getBytes());
        bookie.addEntry(this.generateEntry(2L, 1L), false, (BookkeeperInternalCallbacks.WriteCallback)new Bookie.NopWriteCallback(), null, "passwd".getBytes());
        bookie.addEntry(this.generateEntry(3L, 1L), false, (BookkeeperInternalCallbacks.WriteCallback)new Bookie.NopWriteCallback(), null, "passwd".getBytes());
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 4L));
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(2L, 4L));
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(3L, 4L));
        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
        flushTestSortedLedgerStorage.setInjectFlushException(true, 1L);
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 5L));
        Assert.assertFalse((String)"EntryMemTable SnapShot is not expected to be empty", (boolean)memTable.snapshot.isEmpty());
        Assert.assertEquals((String)"Flusher called", (long)1L, (long)flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
        flushTestSortedLedgerStorage.setInjectFlushException(false, -1L);
        flushTestSortedLedgerStorage.addEntry(this.generateEntry(1L, 5L));
        Assert.assertTrue((String)"EntryMemTable SnapShot is expected to be empty", (boolean)memTable.snapshot.isEmpty());
        Assert.assertEquals((String)"Flusher called", (long)2L, (long)flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
    }

    String[] createAndGetLedgerDirs(int numOfLedgerDirs) throws IOException {
        String[] ledgerDirsPath = new String[numOfLedgerDirs];
        for (int i = 0; i < numOfLedgerDirs; ++i) {
            File ledgerDir = this.createTempDir("bkTest", ".dir");
            File curDir = Bookie.getCurrentDirectory((File)ledgerDir);
            Bookie.checkDirectoryStructure((File)curDir);
            ledgerDirsPath[i] = ledgerDir.getAbsolutePath();
        }
        return ledgerDirsPath;
    }

    static class FlushTestSortedLedgerStorage
    extends SortedLedgerStorage {
        final AtomicBoolean injectMemTableSizeLimitReached;
        final AtomicBoolean injectFlushException;
        final AtomicLong injectFlushExceptionForLedger;
        final AtomicInteger numOfTimesFlushSnapshotCalled = new AtomicInteger(0);
        static final long FORALLLEDGERS = -1L;

        public FlushTestSortedLedgerStorage() {
            this.injectMemTableSizeLimitReached = new AtomicBoolean();
            this.injectFlushException = new AtomicBoolean();
            this.injectFlushExceptionForLedger = new AtomicLong(-1L);
        }

        public void setInjectMemTableSizeLimitReached(boolean setValue) {
            this.injectMemTableSizeLimitReached.set(setValue);
        }

        public void setInjectFlushException(boolean setValue, long ledgerId) {
            this.injectFlushException.set(setValue);
            this.injectFlushExceptionForLedger.set(ledgerId);
        }

        public void incrementNumOfTimesFlushSnapshotCalled() {
            this.numOfTimesFlushSnapshotCalled.incrementAndGet();
        }

        public int getNumOfTimesFlushSnapshotCalled() {
            return this.numOfTimesFlushSnapshotCalled.get();
        }

        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger, ByteBufAllocator allocator) throws IOException {
            super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, stateManager, checkpointSource, checkpointer, statsLogger, allocator);
            this.memTable = this.memTable instanceof EntryMemTableWithParallelFlusher ? new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger){

                boolean isSizeLimitReached() {
                    return injectMemTableSizeLimitReached.get() || super.isSizeLimitReached();
                }

                long flushSnapshot(SkipListFlusher flusher, CheckpointSource.Checkpoint checkpoint) throws IOException {
                    this.incrementNumOfTimesFlushSnapshotCalled();
                    return super.flushSnapshot(flusher, checkpoint);
                }
            } : new EntryMemTable(conf, checkpointSource, statsLogger){

                boolean isSizeLimitReached() {
                    return injectMemTableSizeLimitReached.get() || super.isSizeLimitReached();
                }

                long flushSnapshot(SkipListFlusher flusher, CheckpointSource.Checkpoint checkpoint) throws IOException {
                    this.incrementNumOfTimesFlushSnapshotCalled();
                    return super.flushSnapshot(flusher, checkpoint);
                }
            };
        }

        public void process(long ledgerId, long entryId, ByteBuf buffer) throws IOException {
            if (this.injectFlushException.get() && (this.injectFlushExceptionForLedger.get() == -1L || this.injectFlushExceptionForLedger.get() == ledgerId)) {
                throw new IOException("Injected Exception");
            }
            super.process(ledgerId, entryId, buffer);
        }

        public void onSizeLimitReached(CheckpointSource.Checkpoint cp) throws IOException {
            LOG.info("Reached size {}", (Object)cp);
            try {
                LOG.info("Started flushing mem table.");
                this.memTable.flush((SkipListFlusher)this);
            }
            catch (IOException e) {
                this.getStateManager().doTransitionToReadOnlyMode();
                LOG.error("Exception thrown while flushing skip list cache.", (Throwable)e);
            }
        }
    }
}

