/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.bookie.storage.ldb;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.SortedMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollectorThread;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats;
import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageRocksDB;
import org.apache.bookkeeper.bookie.storage.ldb.LedgerMetadataIndex;
import org.apache.bookkeeper.bookie.storage.ldb.ReadCache;
import org.apache.bookkeeper.bookie.storage.ldb.TransientLedgerInfo;
import org.apache.bookkeeper.bookie.storage.ldb.WriteCache;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleDirectoryDbLedgerStorage
implements CompactableLedgerStorage {
    private final EntryLogger entryLogger;
    private final LedgerMetadataIndex ledgerIndex;
    private final EntryLocationIndex entryLocationIndex;
    private final ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
    private final GarbageCollectorThread gcThread;
    protected volatile WriteCache writeCache;
    protected volatile WriteCache writeCacheBeingFlushed;
    private final ReadCache readCache;
    private final StampedLock writeCacheRotationLock = new StampedLock();
    protected final ReentrantLock flushMutex = new ReentrantLock();
    protected final AtomicBoolean hasFlushBeenTriggered = new AtomicBoolean(false);
    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
    private final ExecutorService executor = Executors.newSingleThreadExecutor((ThreadFactory)new DefaultThreadFactory("db-storage"));
    private final ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("db-storage-cleanup"));
    private final CopyOnWriteArrayList<LedgerStorage.LedgerDeletionListener> ledgerDeletionListeners = Lists.newCopyOnWriteArrayList();
    private final CheckpointSource checkpointSource;
    private CheckpointSource.Checkpoint lastCheckpoint = CheckpointSource.Checkpoint.MIN;
    private final long writeCacheMaxSize;
    private final long readCacheMaxSize;
    private final int readAheadCacheBatchSize;
    private final long maxThrottleTimeNanos;
    private final StatsLogger stats;
    private final OpStatsLogger addEntryStats;
    private final OpStatsLogger readEntryStats;
    private final OpStatsLogger readCacheHitStats;
    private final OpStatsLogger readCacheMissStats;
    private final OpStatsLogger readAheadBatchCountStats;
    private final OpStatsLogger readAheadBatchSizeStats;
    private final OpStatsLogger flushStats;
    private final OpStatsLogger flushSizeStats;
    private final Counter throttledWriteRequests;
    private final Counter rejectedWriteRequests;
    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
    private static final long DEFAULT_MAX_THROTTLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10L);
    private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);

    public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize) throws IOException {
        Preconditions.checkArgument((ledgerDirsManager.getAllLedgerDirs().size() == 1 ? 1 : 0) != 0, (Object)"Db implementation only allows for one storage dir");
        String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
        log.info("Creating single directory db ledger storage on {}", (Object)baseDir);
        this.writeCacheMaxSize = writeCacheSize;
        this.writeCache = new WriteCache(this.writeCacheMaxSize / 2L);
        this.writeCacheBeingFlushed = new WriteCache(this.writeCacheMaxSize / 2L);
        this.checkpointSource = checkpointSource;
        this.readCacheMaxSize = readCacheSize;
        this.readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, 100);
        long maxThrottleTimeMillis = conf.getLong("dbStorage_maxThrottleTimeMs", DEFAULT_MAX_THROTTLE_TIME_MILLIS);
        this.maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
        this.readCache = new ReadCache(this.readCacheMaxSize);
        this.stats = statsLogger;
        this.ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, this.stats);
        this.entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, this.stats);
        this.transientLedgerInfoCache = new ConcurrentLongHashMap(16384, Runtime.getRuntime().availableProcessors() * 2);
        this.cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo, 10L, 10L, TimeUnit.MINUTES);
        this.entryLogger = new EntryLogger(conf, ledgerDirsManager);
        this.gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
        this.stats.registerGauge("write-cache-size", (Gauge)new Gauge<Long>(){

            public Long getDefaultValue() {
                return 0L;
            }

            public Long getSample() {
                return SingleDirectoryDbLedgerStorage.this.writeCache.size() + SingleDirectoryDbLedgerStorage.this.writeCacheBeingFlushed.size();
            }
        });
        this.stats.registerGauge("write-cache-count", (Gauge)new Gauge<Long>(){

            public Long getDefaultValue() {
                return 0L;
            }

            public Long getSample() {
                return SingleDirectoryDbLedgerStorage.this.writeCache.count() + SingleDirectoryDbLedgerStorage.this.writeCacheBeingFlushed.count();
            }
        });
        this.stats.registerGauge("read-cache-size", (Gauge)new Gauge<Long>(){

            public Long getDefaultValue() {
                return 0L;
            }

            public Long getSample() {
                return SingleDirectoryDbLedgerStorage.this.readCache.size();
            }
        });
        this.stats.registerGauge("read-cache-count", (Gauge)new Gauge<Long>(){

            public Long getDefaultValue() {
                return 0L;
            }

            public Long getSample() {
                return SingleDirectoryDbLedgerStorage.this.readCache.count();
            }
        });
        this.addEntryStats = this.stats.getOpStatsLogger("add-entry");
        this.readEntryStats = this.stats.getOpStatsLogger("read-entry");
        this.readCacheHitStats = this.stats.getOpStatsLogger("read-cache-hits");
        this.readCacheMissStats = this.stats.getOpStatsLogger("read-cache-misses");
        this.readAheadBatchCountStats = this.stats.getOpStatsLogger("readahead-batch-count");
        this.readAheadBatchSizeStats = this.stats.getOpStatsLogger("readahead-batch-size");
        this.flushStats = this.stats.getOpStatsLogger("flush");
        this.flushSizeStats = this.stats.getOpStatsLogger("flush-size");
        this.throttledWriteRequests = this.stats.getCounter("throttled-write-requests");
        this.rejectedWriteRequests = this.stats.getCounter("rejected-write-requests");
    }

    @Override
    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
    }

    private void cleanupStaleTransientLedgerInfo() {
        this.transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
            boolean isStale = ledgerInfo.isStale();
            if (isStale) {
                ledgerInfo.close();
            }
            return isStale;
        });
    }

    @Override
    public void start() {
        this.gcThread.start();
    }

    @Override
    public void shutdown() throws InterruptedException {
        try {
            this.flush();
            this.gcThread.shutdown();
            this.entryLogger.shutdown();
            this.cleanupExecutor.shutdown();
            this.cleanupExecutor.awaitTermination(1L, TimeUnit.SECONDS);
            this.ledgerIndex.close();
            this.entryLocationIndex.close();
            this.writeCache.close();
            this.writeCacheBeingFlushed.close();
            this.readCache.close();
            this.executor.shutdown();
        }
        catch (IOException e) {
            log.error("Error closing db storage", (Throwable)e);
        }
    }

    @Override
    public boolean ledgerExists(long ledgerId) throws IOException {
        try {
            DbLedgerStorageDataFormats.LedgerData ledgerData = this.ledgerIndex.get(ledgerId);
            if (log.isDebugEnabled()) {
                log.debug("Ledger exists. ledger: {} : {}", (Object)ledgerId, (Object)ledgerData.getExists());
            }
            return ledgerData.getExists();
        }
        catch (Bookie.NoLedgerException nle) {
            return false;
        }
    }

    @Override
    public boolean isFenced(long ledgerId) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("isFenced. ledger: {}", (Object)ledgerId);
        }
        return this.ledgerIndex.get(ledgerId).getFenced();
    }

    @Override
    public boolean setFenced(long ledgerId) throws IOException {
        TransientLedgerInfo ledgerInfo;
        boolean changed;
        if (log.isDebugEnabled()) {
            log.debug("Set fenced. ledger: {}", (Object)ledgerId);
        }
        if ((changed = this.ledgerIndex.setFenced(ledgerId)) && null != (ledgerInfo = this.transientLedgerInfoCache.get(ledgerId))) {
            ledgerInfo.notifyWatchers(Long.MAX_VALUE);
        }
        return changed;
    }

    @Override
    public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("Set master key. ledger: {}", (Object)ledgerId);
        }
        this.ledgerIndex.setMasterKey(ledgerId, masterKey);
    }

    @Override
    public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
        if (log.isDebugEnabled()) {
            log.debug("Read master key. ledger: {}", (Object)ledgerId);
        }
        return this.ledgerIndex.get(ledgerId).getMasterKey().toByteArray();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long addEntry(ByteBuf entry) throws IOException, BookieException {
        long startTime = MathUtils.nowInNano();
        long ledgerId = entry.getLong(entry.readerIndex());
        long entryId = entry.getLong(entry.readerIndex() + 8);
        long lac = entry.getLong(entry.readerIndex() + 16);
        if (log.isDebugEnabled()) {
            log.debug("Add entry. {}@{}, lac = {}", new Object[]{ledgerId, entryId, lac});
        }
        long stamp = this.writeCacheRotationLock.tryOptimisticRead();
        boolean inserted = false;
        inserted = this.writeCache.put(ledgerId, entryId, entry);
        if (!this.writeCacheRotationLock.validate(stamp)) {
            stamp = this.writeCacheRotationLock.readLock();
            try {
                inserted = this.writeCache.put(ledgerId, entryId, entry);
            }
            finally {
                this.writeCacheRotationLock.unlockRead(stamp);
            }
        }
        if (!inserted) {
            this.triggerFlushAndAddEntry(ledgerId, entryId, entry);
        }
        this.updateCachedLacIfNeeded(ledgerId, lac);
        this.recordSuccessfulEvent(this.addEntryStats, startTime);
        return entryId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException, BookieException {
        if (!this.isFlushOngoing.get() && this.hasFlushBeenTriggered.compareAndSet(false, true)) {
            log.info("Write cache is full, triggering flush");
            this.executor.execute(() -> {
                try {
                    this.flush();
                }
                catch (IOException e) {
                    log.error("Error during flush", (Throwable)e);
                }
            });
        }
        this.throttledWriteRequests.inc();
        long absoluteTimeoutNanos = System.nanoTime() + this.maxThrottleTimeNanos;
        while (System.nanoTime() < absoluteTimeoutNanos) {
            long stamp = this.writeCacheRotationLock.readLock();
            try {
                if (this.writeCache.put(ledgerId, entryId, entry)) {
                    return;
                }
            }
            finally {
                this.writeCacheRotationLock.unlockRead(stamp);
            }
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
            }
        }
        this.rejectedWriteRequests.inc();
        throw new BookieException.OperationRejectedException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
        long entryLocation;
        ByteBuf entry;
        long startTime = MathUtils.nowInNano();
        if (log.isDebugEnabled()) {
            log.debug("Get Entry: {}@{}", (Object)ledgerId, (Object)entryId);
        }
        if (entryId == -1L) {
            return this.getLastEntry(ledgerId);
        }
        long stamp = this.writeCacheRotationLock.tryOptimisticRead();
        WriteCache localWriteCache = this.writeCache;
        WriteCache localWriteCacheBeingFlushed = this.writeCacheBeingFlushed;
        if (!this.writeCacheRotationLock.validate(stamp)) {
            stamp = this.writeCacheRotationLock.readLock();
            try {
                localWriteCache = this.writeCache;
                localWriteCacheBeingFlushed = this.writeCacheBeingFlushed;
            }
            finally {
                this.writeCacheRotationLock.unlockRead(stamp);
            }
        }
        if ((entry = localWriteCache.get(ledgerId, entryId)) != null) {
            this.recordSuccessfulEvent(this.readCacheHitStats, startTime);
            this.recordSuccessfulEvent(this.readEntryStats, startTime);
            return entry;
        }
        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
        if (entry != null) {
            this.recordSuccessfulEvent(this.readCacheHitStats, startTime);
            this.recordSuccessfulEvent(this.readEntryStats, startTime);
            return entry;
        }
        entry = this.readCache.get(ledgerId, entryId);
        if (entry != null) {
            this.recordSuccessfulEvent(this.readCacheHitStats, startTime);
            this.recordSuccessfulEvent(this.readEntryStats, startTime);
            return entry;
        }
        try {
            entryLocation = this.entryLocationIndex.getLocation(ledgerId, entryId);
            if (entryLocation == 0L) {
                throw new Bookie.NoEntryException(ledgerId, entryId);
            }
            entry = this.entryLogger.readEntry(ledgerId, entryId, entryLocation);
        }
        catch (Bookie.NoEntryException e) {
            this.recordFailedEvent(this.readEntryStats, startTime);
            throw e;
        }
        this.readCache.put(ledgerId, entryId, entry);
        long nextEntryLocation = entryLocation + 4L + (long)entry.readableBytes();
        this.fillReadAheadCache(ledgerId, entryId + 1L, nextEntryLocation);
        this.recordSuccessfulEvent(this.readCacheMissStats, startTime);
        this.recordSuccessfulEvent(this.readEntryStats, startTime);
        return entry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) {
        block8: {
            try {
                long firstEntryLogId;
                long currentEntryLogId = firstEntryLogId = firstEntryLocation >> 32;
                long currentEntryLocation = firstEntryLocation;
                int count = 0;
                long size = 0L;
                while (count < this.readAheadCacheBatchSize && currentEntryLogId == firstEntryLogId) {
                    ByteBuf entry = this.entryLogger.internalReadEntry(orginalLedgerId, -1L, currentEntryLocation);
                    try {
                        long currentEntryLedgerId = entry.getLong(0);
                        long currentEntryId = entry.getLong(8);
                        if (currentEntryLedgerId != orginalLedgerId) {
                            return;
                        }
                        this.readCache.put(orginalLedgerId, currentEntryId, entry);
                        ++count;
                        size += (long)entry.readableBytes();
                        currentEntryLogId = (currentEntryLocation += (long)(4 + entry.readableBytes())) >> 32;
                    }
                    finally {
                        entry.release();
                    }
                }
                this.readAheadBatchCountStats.registerSuccessfulValue((long)count);
                this.readAheadBatchSizeStats.registerSuccessfulValue(size);
            }
            catch (Exception e) {
                if (!log.isDebugEnabled()) break block8;
                log.debug("Exception during read ahead for ledger: {}: e", (Object)orginalLedgerId, (Object)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ByteBuf getLastEntry(long ledgerId) throws IOException {
        long startTime = MathUtils.nowInNano();
        long stamp = this.writeCacheRotationLock.readLock();
        try {
            ByteBuf entry = this.writeCache.getLastEntry(ledgerId);
            if (entry != null) {
                if (log.isDebugEnabled()) {
                    long foundLedgerId = entry.readLong();
                    long entryId = entry.readLong();
                    entry.resetReaderIndex();
                    if (log.isDebugEnabled()) {
                        log.debug("Found last entry for ledger {} in write cache: {}@{}", new Object[]{ledgerId, foundLedgerId, entryId});
                    }
                }
                this.recordSuccessfulEvent(this.readCacheHitStats, startTime);
                this.recordSuccessfulEvent(this.readEntryStats, startTime);
                ByteBuf foundLedgerId = entry;
                return foundLedgerId;
            }
            entry = this.writeCacheBeingFlushed.getLastEntry(ledgerId);
            if (entry != null) {
                if (log.isDebugEnabled()) {
                    entry.readLong();
                    long entryId = entry.readLong();
                    entry.resetReaderIndex();
                    if (log.isDebugEnabled()) {
                        log.debug("Found last entry for ledger {} in write cache being flushed: {}", (Object)ledgerId, (Object)entryId);
                    }
                }
                this.recordSuccessfulEvent(this.readCacheHitStats, startTime);
                this.recordSuccessfulEvent(this.readEntryStats, startTime);
                ByteBuf byteBuf = entry;
                return byteBuf;
            }
        }
        finally {
            this.writeCacheRotationLock.unlockRead(stamp);
        }
        long lastEntryId = this.entryLocationIndex.getLastEntryInLedger(ledgerId);
        if (log.isDebugEnabled()) {
            log.debug("Found last entry for ledger {} in db: {}", (Object)ledgerId, (Object)lastEntryId);
        }
        long entryLocation = this.entryLocationIndex.getLocation(ledgerId, lastEntryId);
        ByteBuf content = this.entryLogger.readEntry(ledgerId, lastEntryId, entryLocation);
        this.recordSuccessfulEvent(this.readCacheMissStats, startTime);
        this.recordSuccessfulEvent(this.readEntryStats, startTime);
        return content;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    boolean isFlushRequired() {
        long stamp = this.writeCacheRotationLock.readLock();
        try {
            boolean bl = !this.writeCache.isEmpty();
            return bl;
        }
        finally {
            this.writeCacheRotationLock.unlockRead(stamp);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void checkpoint(CheckpointSource.Checkpoint checkpoint) throws IOException {
        CheckpointSource.Checkpoint thisCheckpoint = this.checkpointSource.newCheckpoint();
        if (this.lastCheckpoint.compareTo(checkpoint) > 0) {
            return;
        }
        long startTime = MathUtils.nowInNano();
        this.flushMutex.lock();
        try {
            this.swapWriteCache();
            long sizeToFlush = this.writeCacheBeingFlushed.size();
            if (log.isDebugEnabled()) {
                log.debug("Flushing entries. count: {} -- size {} Mb", (Object)this.writeCacheBeingFlushed.count(), (Object)((double)sizeToFlush / 1024.0 / 1024.0));
            }
            KeyValueStorage.Batch batch = this.entryLocationIndex.newBatch();
            this.writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
                try {
                    long location = this.entryLogger.addEntry(ledgerId, entry, true);
                    this.entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            this.entryLogger.flush();
            long batchFlushStarTime = System.nanoTime();
            batch.flush();
            batch.close();
            if (log.isDebugEnabled()) {
                log.debug("DB batch flushed time : {} s", (Object)((double)MathUtils.elapsedNanos((long)batchFlushStarTime) / (double)TimeUnit.SECONDS.toNanos(1L)));
            }
            this.ledgerIndex.flush();
            this.cleanupExecutor.execute(() -> {
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Removing deleted ledgers from db indexes");
                    }
                    this.entryLocationIndex.removeOffsetFromDeletedLedgers();
                    this.ledgerIndex.removeDeletedLedgers();
                }
                catch (Throwable t) {
                    log.warn("Failed to cleanup db indexes", t);
                }
            });
            this.lastCheckpoint = thisCheckpoint;
            this.writeCacheBeingFlushed.clear();
            double flushTimeSeconds = (double)MathUtils.elapsedNanos((long)startTime) / (double)TimeUnit.SECONDS.toNanos(1L);
            double flushThroughput = (double)sizeToFlush / 1024.0 / 1024.0 / flushTimeSeconds;
            if (log.isDebugEnabled()) {
                log.debug("Flushing done time {} s -- Written {} MB/s", (Object)flushTimeSeconds, (Object)flushThroughput);
            }
            this.recordSuccessfulEvent(this.flushStats, startTime);
            this.flushSizeStats.registerSuccessfulValue(sizeToFlush);
        }
        catch (IOException e) {
            throw e;
        }
        catch (RuntimeException e) {
            throw new IOException(e);
        }
        finally {
            try {
                this.isFlushOngoing.set(false);
            }
            finally {
                this.flushMutex.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void swapWriteCache() {
        long stamp = this.writeCacheRotationLock.writeLock();
        try {
            WriteCache tmp = this.writeCacheBeingFlushed;
            this.writeCacheBeingFlushed = this.writeCache;
            this.writeCache = tmp;
            this.hasFlushBeenTriggered.set(false);
        }
        finally {
            try {
                this.isFlushOngoing.set(true);
            }
            finally {
                this.writeCacheRotationLock.unlockWrite(stamp);
            }
        }
    }

    @Override
    public void flush() throws IOException {
        CheckpointSource.Checkpoint cp = this.checkpointSource.newCheckpoint();
        this.checkpoint(cp);
        this.checkpointSource.checkpointComplete(cp, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void deleteLedger(long ledgerId) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("Deleting ledger {}", (Object)ledgerId);
        }
        long stamp = this.writeCacheRotationLock.readLock();
        try {
            this.writeCache.deleteLedger(ledgerId);
        }
        finally {
            this.writeCacheRotationLock.unlockRead(stamp);
        }
        this.entryLocationIndex.delete(ledgerId);
        this.ledgerIndex.delete(ledgerId);
        int size = this.ledgerDeletionListeners.size();
        for (int i = 0; i < size; ++i) {
            LedgerStorage.LedgerDeletionListener listener = this.ledgerDeletionListeners.get(i);
            listener.ledgerDeleted(ledgerId);
        }
        TransientLedgerInfo tli = this.transientLedgerInfoCache.remove(ledgerId);
        if (tli != null) {
            tli.close();
        }
    }

    @Override
    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
        return this.ledgerIndex.getActiveLedgersInRange(firstLedgerId, lastLedgerId);
    }

    @Override
    public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
        this.flush();
        this.entryLocationIndex.updateLocations(locations);
    }

    @Override
    public EntryLogger getEntryLogger() {
        return this.entryLogger;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getLastAddConfirmed(long ledgerId) throws IOException {
        long lac;
        TransientLedgerInfo ledgerInfo = this.transientLedgerInfoCache.get(ledgerId);
        long l = lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : Long.MIN_VALUE;
        if (lac == Long.MIN_VALUE) {
            ByteBuf bb = this.getEntry(ledgerId, -1L);
            try {
                bb.skipBytes(16);
                lac = bb.readLong();
                lac = this.getOrAddLedgerInfo(ledgerId).setLastAddConfirmed(lac);
            }
            finally {
                bb.release();
            }
        }
        return lac;
    }

    @Override
    public boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC, Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
        return this.getOrAddLedgerInfo(ledgerId).waitForLastAddConfirmedUpdate(previousLAC, watcher);
    }

    @Override
    public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
        this.getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
    }

    @Override
    public ByteBuf getExplicitLac(long ledgerId) {
        TransientLedgerInfo ledgerInfo = this.transientLedgerInfoCache.get(ledgerId);
        if (null == ledgerInfo) {
            return null;
        }
        return ledgerInfo.getExplicitLac();
    }

    private TransientLedgerInfo getOrAddLedgerInfo(long ledgerId) {
        TransientLedgerInfo tli = this.transientLedgerInfoCache.get(ledgerId);
        if (tli != null) {
            return tli;
        }
        TransientLedgerInfo newTli = new TransientLedgerInfo(ledgerId, this.ledgerIndex);
        tli = this.transientLedgerInfoCache.putIfAbsent(ledgerId, newTli);
        if (tli != null) {
            newTli.close();
            return tli;
        }
        return newTli;
    }

    private void updateCachedLacIfNeeded(long ledgerId, long lac) {
        TransientLedgerInfo tli = this.transientLedgerInfoCache.get(ledgerId);
        if (tli != null) {
            tli.setLastAddConfirmed(lac);
        }
    }

    @Override
    public void flushEntriesLocationsIndex() throws IOException {
    }

    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey, Iterable<SortedMap<Long, Long>> entries) throws Exception {
        DbLedgerStorageDataFormats.LedgerData ledgerData = DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(isFenced).setMasterKey(ByteString.copyFrom((byte[])masterKey)).build();
        this.ledgerIndex.set(ledgerId, ledgerData);
        AtomicLong numberOfEntries = new AtomicLong();
        KeyValueStorage.Batch batch = this.entryLocationIndex.newBatch();
        entries.forEach(map -> map.forEach((entryId, location) -> {
            try {
                this.entryLocationIndex.addLocation(batch, ledgerId, (long)entryId, (long)location);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            numberOfEntries.incrementAndGet();
        }));
        batch.flush();
        batch.close();
        return numberOfEntries.get();
    }

    @Override
    public void registerLedgerDeletionListener(LedgerStorage.LedgerDeletionListener listener) {
        this.ledgerDeletionListeners.add(listener);
    }

    public EntryLocationIndex getEntryLocationIndex() {
        return this.entryLocationIndex;
    }

    private void recordSuccessfulEvent(OpStatsLogger logger, long startTimeNanos) {
        logger.registerSuccessfulEvent(MathUtils.elapsedNanos((long)startTimeNanos), TimeUnit.NANOSECONDS);
    }

    private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
        logger.registerFailedEvent(MathUtils.elapsedNanos((long)startTimeNanos), TimeUnit.NANOSECONDS);
    }

    long getWriteCacheSize() {
        return this.writeCache.size() + this.writeCacheBeingFlushed.size();
    }

    long getWriteCacheCount() {
        return this.writeCache.count() + this.writeCacheBeingFlushed.count();
    }

    long getReadCacheSize() {
        return this.readCache.size();
    }

    long getReadCacheCount() {
        return this.readCache.count();
    }

    public static interface LedgerLoggerProcessor {
        public void process(long var1, long var3, long var5);
    }
}

