/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hudi.org.apache.commons.io.IOUtils;
import org.apache.hudi.org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hudi.org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hudi.org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocatorException;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.BucketCacheStats;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.BucketProtoUtils;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.ByteBufferIOEngine;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.CacheFullException;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.CachedEntryQueue;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.ExclusiveMemoryMmapIOEngine;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.FileIOEngine;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.IOEngine;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.PersistentIOEngine;
import org.apache.hudi.org.apache.hadoop.hbase.io.hfile.bucket.SharedMemoryMmapIOEngine;
import org.apache.hudi.org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hudi.org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hudi.org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
import org.apache.hudi.org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hudi.org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class BucketCache
implements BlockCache,
HeapSize {
    private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
    static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
    static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
    static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
    static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
    static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
    static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
    static final float DEFAULT_SINGLE_FACTOR = 0.25f;
    static final float DEFAULT_MULTI_FACTOR = 0.5f;
    static final float DEFAULT_MEMORY_FACTOR = 0.25f;
    static final float DEFAULT_MIN_FACTOR = 0.85f;
    private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.1f;
    private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
    private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
    private static final int statThreadPeriod = 300;
    static final int DEFAULT_WRITER_THREADS = 3;
    static final int DEFAULT_WRITER_QUEUE_ITEMS = 64;
    final transient IOEngine ioEngine;
    final transient RAMCache ramCache;
    transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
    private volatile boolean cacheEnabled;
    final transient ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList();
    final transient WriterThread[] writerThreads;
    private volatile boolean freeInProgress = false;
    private final transient Lock freeSpaceLock = new ReentrantLock();
    private final LongAdder realCacheSize = new LongAdder();
    private final LongAdder heapSize = new LongAdder();
    private final LongAdder blockNumber = new LongAdder();
    private final AtomicLong accessCount = new AtomicLong();
    private static final int DEFAULT_CACHE_WAIT_TIME = 50;
    boolean wait_when_cache = false;
    private final BucketCacheStats cacheStats = new BucketCacheStats();
    private final String persistencePath;
    private final long cacheCapacity;
    private final long blockSize;
    private final int ioErrorsTolerationDuration;
    public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60000;
    private volatile long ioErrorStartTime = -1L;
    final transient IdReadWriteLock<Long> offsetLock = new IdReadWriteLock(IdReadWriteLock.ReferenceType.SOFT);
    private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<BlockCacheKey>((a, b) -> {
        int nameComparison = a.getHfileName().compareTo(b.getHfileName());
        if (nameComparison != 0) {
            return nameComparison;
        }
        return Long.compare(a.getOffset(), b.getOffset());
    });
    private final transient ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
    private transient BucketAllocator bucketAllocator;
    private float acceptableFactor;
    private float minFactor;
    private float extraFreeFactor;
    private float singleFactor;
    private float multiFactor;
    private float memoryFactor;
    private static final String FILE_VERIFY_ALGORITHM = "hbase.bucketcache.persistent.file.integrity.check.algorithm";
    private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
    private String algorithm;
    private long allocFailLogPrevTs;
    private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000;

    public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
        this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, persistencePath, 60000, HBaseConfiguration.create());
    }

    public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, Configuration conf) throws IOException {
        this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
        this.ioEngine = this.getIOEngineFromName(ioEngineName, capacity, persistencePath);
        this.writerThreads = new WriterThread[writerThreadNum];
        long blockNumCapacity = capacity / (long)blockSize;
        if (blockNumCapacity >= Integer.MAX_VALUE) {
            throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
        }
        this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.95f);
        this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, 0.85f);
        this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.1f);
        this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, 0.25f);
        this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, 0.5f);
        this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, 0.25f);
        this.sanityCheckConfigs();
        LOG.info("Instantiating BucketCache with acceptableFactor: " + this.acceptableFactor + ", minFactor: " + this.minFactor + ", extraFreeFactor: " + this.extraFreeFactor + ", singleFactor: " + this.singleFactor + ", multiFactor: " + this.multiFactor + ", memoryFactor: " + this.memoryFactor);
        this.cacheCapacity = capacity;
        this.persistencePath = persistencePath;
        this.blockSize = blockSize;
        this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
        this.allocFailLogPrevTs = 0L;
        this.bucketAllocator = new BucketAllocator(capacity, bucketSizes);
        for (int i = 0; i < this.writerThreads.length; ++i) {
            this.writerQueues.add(new ArrayBlockingQueue(writerQLen));
        }
        assert (this.writerQueues.size() == this.writerThreads.length);
        this.ramCache = new RAMCache();
        this.backingMap = new ConcurrentHashMap((int)blockNumCapacity);
        if (this.ioEngine.isPersistent() && persistencePath != null) {
            try {
                this.retrieveFromFile(bucketSizes);
            }
            catch (IOException ioex) {
                LOG.error("Can't restore from file[" + persistencePath + "] because of ", (Throwable)ioex);
            }
        }
        String threadName = Thread.currentThread().getName();
        this.cacheEnabled = true;
        for (int i = 0; i < this.writerThreads.length; ++i) {
            this.writerThreads[i] = new WriterThread(this.writerQueues.get(i));
            this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
            this.writerThreads[i].setDaemon(true);
        }
        this.startWriterThreads();
        this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), 300L, 300L, TimeUnit.SECONDS);
        LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" + StringUtils.byteDesc((long)capacity) + ", blockSize=" + StringUtils.byteDesc((long)blockSize) + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
    }

    private void sanityCheckConfigs() {
        Preconditions.checkArgument(this.acceptableFactor <= 1.0f && this.acceptableFactor >= 0.0f, "hbase.bucketcache.acceptfactor must be between 0.0 and 1.0");
        Preconditions.checkArgument(this.minFactor <= 1.0f && this.minFactor >= 0.0f, "hbase.bucketcache.minfactor must be between 0.0 and 1.0");
        Preconditions.checkArgument(this.minFactor <= this.acceptableFactor, "hbase.bucketcache.minfactor must be <= hbase.bucketcache.acceptfactor");
        Preconditions.checkArgument(this.extraFreeFactor >= 0.0f, "hbase.bucketcache.extrafreefactor must be greater than 0.0");
        Preconditions.checkArgument(this.singleFactor <= 1.0f && this.singleFactor >= 0.0f, "hbase.bucketcache.single.factor must be between 0.0 and 1.0");
        Preconditions.checkArgument(this.multiFactor <= 1.0f && this.multiFactor >= 0.0f, "hbase.bucketcache.multi.factor must be between 0.0 and 1.0");
        Preconditions.checkArgument(this.memoryFactor <= 1.0f && this.memoryFactor >= 0.0f, "hbase.bucketcache.memory.factor must be between 0.0 and 1.0");
        Preconditions.checkArgument(this.singleFactor + this.multiFactor + this.memoryFactor == 1.0f, "hbase.bucketcache.single.factor, hbase.bucketcache.multi.factor, and hbase.bucketcache.memory.factor segments must add up to 1.0");
    }

    protected void startWriterThreads() {
        for (WriterThread thread2 : this.writerThreads) {
            thread2.start();
        }
    }

    boolean isCacheEnabled() {
        return this.cacheEnabled;
    }

    @Override
    public long getMaxSize() {
        return this.cacheCapacity;
    }

    public String getIoEngine() {
        return this.ioEngine.toString();
    }

    private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) throws IOException {
        if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
            String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(",");
            return new FileIOEngine(capacity, persistencePath != null, filePaths);
        }
        if (ioEngineName.startsWith("offheap")) {
            return new ByteBufferIOEngine(capacity);
        }
        if (ioEngineName.startsWith("mmap:")) {
            return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
        }
        if (ioEngineName.startsWith("pmem:")) {
            return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
        }
        throw new IllegalArgumentException("Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
    }

    @Override
    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
        this.cacheBlock(cacheKey, buf, false);
    }

    @Override
    public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
        this.cacheBlockWithWait(cacheKey, cachedItem, inMemory, this.wait_when_cache);
    }

    public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) {
        if (this.cacheEnabled) {
            if (this.backingMap.containsKey(cacheKey) || this.ramCache.containsKey(cacheKey)) {
                if (this.shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
                    BucketEntry bucketEntry = this.backingMap.get(cacheKey);
                    if (bucketEntry != null && bucketEntry.isRpcRef()) {
                        return;
                    }
                    this.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
                }
            } else {
                this.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
            }
        }
    }

    protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
        return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
    }

    protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) {
        if (!this.cacheEnabled) {
            return;
        }
        LOG.trace("Caching key={}, item={}", (Object)cacheKey, (Object)cachedItem);
        RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, this.accessCount.incrementAndGet(), inMemory);
        if (this.ramCache.putIfAbsent(cacheKey, re) != null) {
            return;
        }
        int queueNum = (cacheKey.hashCode() & Integer.MAX_VALUE) % this.writerQueues.size();
        BlockingQueue<RAMQueueEntry> bq = this.writerQueues.get(queueNum);
        boolean successfulAddition = false;
        if (wait) {
            try {
                successfulAddition = bq.offer(re, 50L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } else {
            successfulAddition = bq.offer(re);
        }
        if (!successfulAddition) {
            this.ramCache.remove(cacheKey);
            this.cacheStats.failInsert();
        } else {
            this.blockNumber.increment();
            this.heapSize.add(cachedItem.heapSize());
            this.blocksByHFile.add(cacheKey);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics) {
        if (!this.cacheEnabled) {
            return null;
        }
        RAMQueueEntry re = this.ramCache.get(key);
        if (re != null) {
            if (updateCacheMetrics) {
                this.cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
            }
            re.access(this.accessCount.incrementAndGet());
            return re.getData();
        }
        BucketEntry bucketEntry = this.backingMap.get(key);
        if (bucketEntry != null) {
            long start2 = System.nanoTime();
            ReentrantReadWriteLock lock = this.offsetLock.getLock(bucketEntry.offset());
            try {
                lock.readLock().lock();
                if (bucketEntry.equals(this.backingMap.get(key))) {
                    Cacheable cachedBlock = this.ioEngine.read(bucketEntry);
                    if (this.ioEngine.usesSharedMemory()) {
                        cachedBlock.retain();
                    }
                    if (updateCacheMetrics) {
                        this.cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
                        this.cacheStats.ioHit(System.nanoTime() - start2);
                    }
                    bucketEntry.access(this.accessCount.incrementAndGet());
                    if (this.ioErrorStartTime > 0L) {
                        this.ioErrorStartTime = -1L;
                    }
                    Cacheable cacheable = cachedBlock;
                    return cacheable;
                }
            }
            catch (IOException ioex) {
                LOG.error("Failed reading block " + key + " from bucket cache", (Throwable)ioex);
                this.checkIOErrorIsTolerated();
            }
            finally {
                lock.readLock().unlock();
            }
        }
        if (!repeat && updateCacheMetrics) {
            this.cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
        }
        return null;
    }

    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
        bucketEntry.markAsEvicted();
        this.blocksByHFile.remove(cacheKey);
        if (decrementBlockNumber) {
            this.blockNumber.decrement();
        }
        this.cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
    }

    void freeBucketEntry(BucketEntry bucketEntry) {
        this.bucketAllocator.freeBlock(bucketEntry.offset());
        this.realCacheSize.add(-1 * bucketEntry.getLength());
    }

    @Override
    public boolean evictBlock(BlockCacheKey cacheKey) {
        return this.doEvictBlock(cacheKey, null);
    }

    private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) {
        BucketEntry bucketEntryToUse;
        if (!this.cacheEnabled) {
            return false;
        }
        boolean existedInRamCache = this.removeFromRamCache(cacheKey);
        if (bucketEntry == null) {
            bucketEntry = this.backingMap.get(cacheKey);
        }
        if ((bucketEntryToUse = bucketEntry) == null) {
            if (existedInRamCache) {
                this.cacheStats.evicted(0L, cacheKey.isPrimary());
            }
            return existedInRamCache;
        }
        return bucketEntryToUse.withWriteLock(this.offsetLock, () -> {
            if (this.backingMap.remove(cacheKey, bucketEntryToUse)) {
                this.blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache);
                return true;
            }
            return false;
        });
    }

    private ByteBuffAllocator.Recycler createRecycler(BucketEntry bucketEntry) {
        return () -> this.freeBucketEntry(bucketEntry);
    }

    public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
        BucketEntry bucketEntry = this.backingMap.get(blockCacheKey);
        if (bucketEntry == null) {
            return false;
        }
        return this.evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
    }

    boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
        if (!bucketEntry.isRpcRef()) {
            return this.doEvictBlock(blockCacheKey, bucketEntry);
        }
        return false;
    }

    protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
        return this.ramCache.remove(cacheKey, re -> {
            if (re != null) {
                this.blockNumber.decrement();
                this.heapSize.add(-1L * re.getData().heapSize());
            }
        });
    }

    public void logStats() {
        long totalSize = this.bucketAllocator.getTotalSize();
        long usedSize = this.bucketAllocator.getUsedSize();
        long freeSize = totalSize - usedSize;
        long cacheSize = this.getRealCacheSize();
        LOG.info("failedBlockAdditions=" + this.cacheStats.getFailedInserts() + ", totalSize=" + StringUtils.byteDesc((long)totalSize) + ", freeSize=" + StringUtils.byteDesc((long)freeSize) + ", usedSize=" + StringUtils.byteDesc((long)usedSize) + ", cacheSize=" + StringUtils.byteDesc((long)cacheSize) + ", accesses=" + this.cacheStats.getRequestCount() + ", hits=" + this.cacheStats.getHitCount() + ", IOhitsPerSecond=" + this.cacheStats.getIOHitsPerSecond() + ", IOTimePerHit=" + String.format("%.2f", this.cacheStats.getIOTimePerHit()) + ", hitRatio=" + (this.cacheStats.getHitCount() == 0L ? "0," : StringUtils.formatPercent((double)this.cacheStats.getHitRatio(), (int)2) + ", ") + "cachingAccesses=" + this.cacheStats.getRequestCachingCount() + ", cachingHits=" + this.cacheStats.getHitCachingCount() + ", cachingHitsRatio=" + (this.cacheStats.getHitCachingCount() == 0L ? "0," : StringUtils.formatPercent((double)this.cacheStats.getHitCachingRatio(), (int)2) + ", ") + "evictions=" + this.cacheStats.getEvictionCount() + ", evicted=" + this.cacheStats.getEvictedCount() + ", evictedPerRun=" + this.cacheStats.evictedPerEviction() + ", allocationFailCount=" + this.cacheStats.getAllocationFailCount());
        this.cacheStats.reset();
    }

    public long getRealCacheSize() {
        return this.realCacheSize.sum();
    }

    public long acceptableSize() {
        return (long)Math.floor((float)this.bucketAllocator.getTotalSize() * this.acceptableFactor);
    }

    long getPartitionSize(float partitionFactor) {
        return (long)Math.floor((float)this.bucketAllocator.getTotalSize() * partitionFactor * this.minFactor);
    }

    private int bucketSizesAboveThresholdCount(float minFactor) {
        BucketAllocator.IndexStatistics[] stats = this.bucketAllocator.getIndexStatistics();
        int fullCount = 0;
        for (int i = 0; i < stats.length; ++i) {
            long freeGoal = (long)Math.floor((float)stats[i].totalCount() * (1.0f - minFactor));
            freeGoal = Math.max(freeGoal, 1L);
            if (stats[i].freeCount() >= freeGoal) continue;
            ++fullCount;
        }
        return fullCount;
    }

    private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
        if (completelyFreeBucketsNeeded != 0) {
            HashSet<Integer> inUseBuckets = new HashSet<Integer>();
            this.backingMap.forEach((? super K k, ? super V be) -> {
                if (be.isRpcRef()) {
                    inUseBuckets.add(this.bucketAllocator.getBucketIndex(be.offset()));
                }
            });
            Set<Integer> candidateBuckets = this.bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
            for (Map.Entry<BlockCacheKey, BucketEntry> entry : this.backingMap.entrySet()) {
                if (!candidateBuckets.contains(this.bucketAllocator.getBucketIndex(entry.getValue().offset()))) continue;
                this.evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void freeSpace(String why) {
        if (!this.freeSpaceLock.tryLock()) {
            return;
        }
        try {
            BucketEntryGroup bucketGroup;
            this.freeInProgress = true;
            long bytesToFreeWithoutExtra = 0L;
            StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;
            BucketAllocator.IndexStatistics[] stats = this.bucketAllocator.getIndexStatistics();
            long[] bytesToFreeForBucket = new long[stats.length];
            for (int i = 0; i < stats.length; ++i) {
                bytesToFreeForBucket[i] = 0L;
                long freeGoal = (long)Math.floor((float)stats[i].totalCount() * (1.0f - this.minFactor));
                freeGoal = Math.max(freeGoal, 1L);
                if (stats[i].freeCount() >= freeGoal) continue;
                bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
                bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
                if (msgBuffer == null) continue;
                msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" + StringUtils.byteDesc((long)bytesToFreeForBucket[i]) + ", ");
            }
            if (msgBuffer != null) {
                msgBuffer.append("Free for total=" + StringUtils.byteDesc((long)bytesToFreeWithoutExtra) + ", ");
            }
            if (bytesToFreeWithoutExtra <= 0L) {
                return;
            }
            long currentSize = this.bucketAllocator.getUsedSize();
            long totalSize = this.bucketAllocator.getTotalSize();
            if (LOG.isDebugEnabled() && msgBuffer != null) {
                LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + " of current used=" + StringUtils.byteDesc((long)currentSize) + ", actual cacheSize=" + StringUtils.byteDesc((long)this.realCacheSize.sum()) + ", total=" + StringUtils.byteDesc((long)totalSize));
            }
            long bytesToFreeWithExtra = (long)Math.floor((float)bytesToFreeWithoutExtra * (1.0f + this.extraFreeFactor));
            BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, this.blockSize, this.getPartitionSize(this.singleFactor));
            BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, this.blockSize, this.getPartitionSize(this.multiFactor));
            BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, this.blockSize, this.getPartitionSize(this.memoryFactor));
            for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : this.backingMap.entrySet()) {
                switch (bucketEntryWithKey.getValue().getPriority()) {
                    case SINGLE: {
                        bucketSingle.add(bucketEntryWithKey);
                        break;
                    }
                    case MULTI: {
                        bucketMulti.add(bucketEntryWithKey);
                        break;
                    }
                    case MEMORY: {
                        bucketMemory.add(bucketEntryWithKey);
                    }
                }
            }
            PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
            bucketQueue.add(bucketSingle);
            bucketQueue.add(bucketMulti);
            bucketQueue.add(bucketMemory);
            int remainingBuckets = bucketQueue.size();
            long bytesFreed = 0L;
            while ((bucketGroup = bucketQueue.poll()) != null) {
                long overflow = bucketGroup.overflow();
                if (overflow > 0L) {
                    long bucketBytesToFree = Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / (long)remainingBuckets);
                    bytesFreed += bucketGroup.free(bucketBytesToFree);
                }
                --remainingBuckets;
            }
            if (this.bucketSizesAboveThresholdCount(this.minFactor) > 0) {
                bucketQueue.clear();
                remainingBuckets = 3;
                bucketQueue.add(bucketSingle);
                bucketQueue.add(bucketMulti);
                bucketQueue.add(bucketMemory);
                while ((bucketGroup = bucketQueue.poll()) != null) {
                    long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / (long)remainingBuckets;
                    bytesFreed += bucketGroup.free(bucketBytesToFree);
                    --remainingBuckets;
                }
            }
            this.freeEntireBuckets(2 * this.bucketSizesAboveThresholdCount(1.0f));
            if (LOG.isDebugEnabled()) {
                long single = bucketSingle.totalSize();
                long multi = bucketMulti.totalSize();
                long memory = bucketMemory.totalSize();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Bucket cache free space completed; freed=" + StringUtils.byteDesc((long)bytesFreed) + ", total=" + StringUtils.byteDesc((long)totalSize) + ", single=" + StringUtils.byteDesc((long)single) + ", multi=" + StringUtils.byteDesc((long)multi) + ", memory=" + StringUtils.byteDesc((long)memory));
                }
            }
        }
        catch (Throwable t) {
            LOG.warn("Failed freeing space", t);
        }
        finally {
            this.cacheStats.evict();
            this.freeInProgress = false;
            this.freeSpaceLock.unlock();
        }
    }

    protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
        BucketEntry previousEntry = this.backingMap.put(key, bucketEntry);
        if (previousEntry != null && previousEntry != bucketEntry) {
            previousEntry.withWriteLock(this.offsetLock, () -> {
                this.blockEvicted(key, previousEntry, false);
                return null;
            });
        }
    }

    private String getAllocationFailWarningMessage(RAMQueueEntry re) {
        if (re != null && re.getData() instanceof HFileBlock) {
            HFileContext fileContext = ((HFileBlock)re.getData()).getHFileContext();
            String columnFamily = Bytes.toString(fileContext.getColumnFamily());
            String tableName = Bytes.toString(fileContext.getTableName());
            if (tableName != null && columnFamily != null) {
                return "Most recent failed allocation in 60000 milliseconds; Table Name = " + tableName + ", Column Family = " + columnFamily + ", HFile Name : " + fileContext.getHFileName();
            }
        }
        return "Most recent failed allocation in 60000 milliseconds; HFile Name : " + (re == null ? "" : re.getKey());
    }

    void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
        if (entries.isEmpty()) {
            return;
        }
        int size = entries.size();
        BucketEntry[] bucketEntries = new BucketEntry[size];
        int index = 0;
        while (this.cacheEnabled && index < size) {
            RAMQueueEntry re2 = null;
            try {
                BucketEntry bucketEntry;
                re2 = entries.get(index);
                if (re2 == null) {
                    LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
                    ++index;
                    continue;
                }
                bucketEntries[index] = bucketEntry = re2.writeToCache(this.ioEngine, this.bucketAllocator, this.realCacheSize, this::createRecycler);
                if (this.ioErrorStartTime > 0L) {
                    this.ioErrorStartTime = -1L;
                }
                ++index;
            }
            catch (BucketAllocatorException fle) {
                long currTs = EnvironmentEdgeManager.currentTime();
                this.cacheStats.allocationFailed();
                if (this.allocFailLogPrevTs == 0L || currTs - this.allocFailLogPrevTs > 60000L) {
                    LOG.warn(this.getAllocationFailWarningMessage(re2), (Throwable)fle);
                    this.allocFailLogPrevTs = currTs;
                }
                bucketEntries[index] = null;
                ++index;
            }
            catch (CacheFullException cfe) {
                if (!this.freeInProgress) {
                    this.freeSpace("Full!");
                    continue;
                }
                Thread.sleep(50L);
            }
            catch (IOException ioex) {
                LOG.error("Failed writing to bucket cache", (Throwable)ioex);
                this.checkIOErrorIsTolerated();
            }
        }
        try {
            this.ioEngine.sync();
        }
        catch (IOException ioex) {
            LOG.error("Failed syncing IO engine", (Throwable)ioex);
            this.checkIOErrorIsTolerated();
            for (int i = 0; i < entries.size(); ++i) {
                if (bucketEntries[i] == null) continue;
                this.bucketAllocator.freeBlock(bucketEntries[i].offset());
                bucketEntries[i] = null;
            }
        }
        for (int i = 0; i < size; ++i) {
            boolean existed;
            BlockCacheKey key = entries.get(i).getKey();
            if (bucketEntries[i] != null) {
                this.putIntoBackingMap(key, bucketEntries[i]);
            }
            if ((existed = this.ramCache.remove(key, re -> {
                if (re != null) {
                    this.heapSize.add(-1L * re.getData().heapSize());
                }
            })) || bucketEntries[i] == null) continue;
            BucketEntry bucketEntry = bucketEntries[i];
            bucketEntry.withWriteLock(this.offsetLock, () -> {
                if (this.backingMap.remove(key, bucketEntry)) {
                    this.blockEvicted(key, bucketEntry, false);
                }
                return null;
            });
        }
        long used = this.bucketAllocator.getUsedSize();
        if (used > this.acceptableSize()) {
            this.freeSpace("Used=" + used + " > acceptable=" + this.acceptableSize());
        }
    }

    static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, List<RAMQueueEntry> receptacle) throws InterruptedException {
        receptacle.clear();
        receptacle.add(q.take());
        q.drainTo(receptacle);
        return receptacle;
    }

    @SuppressWarnings(value={"OBL_UNSATISFIED_OBLIGATION"}, justification="false positive, try-with-resources ensures close is called.")
    private void persistToFile() throws IOException {
        assert (!this.cacheEnabled);
        if (!this.ioEngine.isPersistent()) {
            throw new IOException("Attempt to persist non-persistent cache mappings!");
        }
        try (FileOutputStream fos = new FileOutputStream(this.persistencePath, false);){
            fos.write(ProtobufMagic.PB_MAGIC);
            BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
        }
    }

    private void retrieveFromFile(int[] bucketSizes) throws IOException {
        File persistenceFile = new File(this.persistencePath);
        if (!persistenceFile.exists()) {
            return;
        }
        assert (!this.cacheEnabled);
        try (FileInputStream in = this.deleteFileOnClose(persistenceFile);){
            int pblen = ProtobufMagic.lengthOfPBMagic();
            byte[] pbuf = new byte[pblen];
            IOUtils.readFully(in, pbuf, 0, pblen);
            if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
                throw new IOException("Persistence file does not start with protobuf magic number. " + this.persistencePath);
            }
            this.parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
            this.bucketAllocator = new BucketAllocator(this.cacheCapacity, bucketSizes, this.backingMap, this.realCacheSize);
            this.blockNumber.add(this.backingMap.size());
        }
    }

    private FileInputStream deleteFileOnClose(File file) throws IOException {
        return (new FileInputStream(file){
            private File myFile;

            private FileInputStream init(File file) {
                this.myFile = file;
                return this;
            }

            @Override
            public void close() throws IOException {
                if (this.myFile == null) {
                    return;
                }
                super.close();
                if (!this.myFile.delete()) {
                    throw new IOException("Failed deleting persistence file " + this.myFile.getAbsolutePath());
                }
                this.myFile = null;
            }
        }).init(file);
    }

    private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) throws IOException {
        if (capacitySize != this.cacheCapacity) {
            throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc((long)capacitySize) + ", expected: " + StringUtils.byteDesc((long)this.cacheCapacity));
        }
        if (!this.ioEngine.getClass().getName().equals(ioclass)) {
            throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" + this.ioEngine.getClass().getName());
        }
        if (!this.backingMap.getClass().getName().equals(mapclass)) {
            throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" + this.backingMap.getClass().getName());
        }
    }

    private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
        if (proto.hasChecksum()) {
            ((PersistentIOEngine)this.ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), this.algorithm);
        } else {
            LOG.info("Persistent file is old format, it does not support verifying file integrity!");
        }
        this.verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
        this.backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), this::createRecycler);
    }

    private void checkIOErrorIsTolerated() {
        long now = EnvironmentEdgeManager.currentTime();
        long ioErrorStartTimeTmp = this.ioErrorStartTime;
        if (ioErrorStartTimeTmp > 0L) {
            if (this.cacheEnabled && now - ioErrorStartTimeTmp > (long)this.ioErrorsTolerationDuration) {
                LOG.error("IO errors duration time has exceeded " + this.ioErrorsTolerationDuration + "ms, disabling cache, please check your IOEngine");
                this.disableCache();
            }
        } else {
            this.ioErrorStartTime = now;
        }
    }

    private void disableCache() {
        if (!this.cacheEnabled) {
            return;
        }
        this.cacheEnabled = false;
        this.ioEngine.shutdown();
        this.scheduleThreadPool.shutdown();
        for (int i = 0; i < this.writerThreads.length; ++i) {
            this.writerThreads[i].interrupt();
        }
        this.ramCache.clear();
        if (!this.ioEngine.isPersistent() || this.persistencePath == null) {
            this.backingMap.clear();
        }
    }

    private void join() throws InterruptedException {
        for (int i = 0; i < this.writerThreads.length; ++i) {
            this.writerThreads[i].join();
        }
    }

    @Override
    public void shutdown() {
        this.disableCache();
        LOG.info("Shutdown bucket cache: IO persistent=" + this.ioEngine.isPersistent() + "; path to write=" + this.persistencePath);
        if (this.ioEngine.isPersistent() && this.persistencePath != null) {
            try {
                this.join();
                this.persistToFile();
            }
            catch (IOException ex) {
                LOG.error("Unable to persist data on exit: " + ex.toString(), (Throwable)ex);
            }
            catch (InterruptedException e) {
                LOG.warn("Failed to persist data on exit", (Throwable)e);
            }
        }
    }

    @Override
    public CacheStats getStats() {
        return this.cacheStats;
    }

    public BucketAllocator getAllocator() {
        return this.bucketAllocator;
    }

    @Override
    public long heapSize() {
        return this.heapSize.sum();
    }

    @Override
    public long size() {
        return this.realCacheSize.sum();
    }

    @Override
    public long getCurrentDataSize() {
        return this.size();
    }

    @Override
    public long getFreeSize() {
        return this.bucketAllocator.getFreeSize();
    }

    @Override
    public long getBlockCount() {
        return this.blockNumber.sum();
    }

    @Override
    public long getDataBlockCount() {
        return this.getBlockCount();
    }

    @Override
    public long getCurrentSize() {
        return this.bucketAllocator.getUsedSize();
    }

    protected String getAlgorithm() {
        return this.algorithm;
    }

    @Override
    public int evictBlocksByHfileName(String hfileName) {
        NavigableSet<BlockCacheKey> keySet = this.blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
        int numEvicted = 0;
        for (BlockCacheKey key : keySet) {
            if (!this.evictBlock(key)) continue;
            ++numEvicted;
        }
        return numEvicted;
    }

    void stopWriterThreads() throws InterruptedException {
        for (WriterThread writerThread : this.writerThreads) {
            writerThread.disableWriter();
            writerThread.interrupt();
            writerThread.join();
        }
    }

    @Override
    public Iterator<CachedBlock> iterator() {
        final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator();
        return new Iterator<CachedBlock>(){
            private final long now = System.nanoTime();

            @Override
            public boolean hasNext() {
                return i.hasNext();
            }

            @Override
            public CachedBlock next() {
                final Map.Entry e = (Map.Entry)i.next();
                return new CachedBlock(){

                    public String toString() {
                        return BlockCacheUtil.toString(this, now);
                    }

                    @Override
                    public BlockPriority getBlockPriority() {
                        return ((BucketEntry)e.getValue()).getPriority();
                    }

                    @Override
                    public BlockType getBlockType() {
                        return null;
                    }

                    @Override
                    public long getOffset() {
                        return ((BlockCacheKey)e.getKey()).getOffset();
                    }

                    @Override
                    public long getSize() {
                        return ((BucketEntry)e.getValue()).getLength();
                    }

                    @Override
                    public long getCachedTime() {
                        return ((BucketEntry)e.getValue()).getCachedTime();
                    }

                    @Override
                    public String getFilename() {
                        return ((BlockCacheKey)e.getKey()).getHfileName();
                    }

                    @Override
                    public int compareTo(CachedBlock other) {
                        int diff = this.getFilename().compareTo(other.getFilename());
                        if (diff != 0) {
                            return diff;
                        }
                        diff = Long.compare(this.getOffset(), other.getOffset());
                        if (diff != 0) {
                            return diff;
                        }
                        if (other.getCachedTime() < 0L || this.getCachedTime() < 0L) {
                            throw new IllegalStateException("" + this.getCachedTime() + ", " + other.getCachedTime());
                        }
                        return Long.compare(other.getCachedTime(), this.getCachedTime());
                    }

                    public int hashCode() {
                        return ((BlockCacheKey)e.getKey()).hashCode();
                    }

                    public boolean equals(Object obj) {
                        if (obj instanceof CachedBlock) {
                            CachedBlock cb = (CachedBlock)obj;
                            return this.compareTo(cb) == 0;
                        }
                        return false;
                    }
                };
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    @Override
    public BlockCache[] getBlockCaches() {
        return null;
    }

    public int getRpcRefCount(BlockCacheKey cacheKey) {
        BucketEntry bucketEntry = this.backingMap.get(cacheKey);
        if (bucketEntry != null) {
            return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1);
        }
        return 0;
    }

    float getAcceptableFactor() {
        return this.acceptableFactor;
    }

    float getMinFactor() {
        return this.minFactor;
    }

    float getExtraFreeFactor() {
        return this.extraFreeFactor;
    }

    float getSingleFactor() {
        return this.singleFactor;
    }

    float getMultiFactor() {
        return this.multiFactor;
    }

    float getMemoryFactor() {
        return this.memoryFactor;
    }

    static class RAMCache {
        final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap();

        RAMCache() {
        }

        public boolean containsKey(BlockCacheKey key) {
            return this.delegate.containsKey(key);
        }

        public RAMQueueEntry get(BlockCacheKey key) {
            return this.delegate.computeIfPresent(key, (k, re) -> {
                re.getData().retain();
                return re;
            });
        }

        public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
            AtomicBoolean absent = new AtomicBoolean(false);
            RAMQueueEntry re = this.delegate.computeIfAbsent(key, k -> {
                entry.getData().retain();
                absent.set(true);
                return entry;
            });
            return absent.get() ? null : re;
        }

        public boolean remove(BlockCacheKey key) {
            return this.remove(key, re -> {});
        }

        public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
            RAMQueueEntry previous = this.delegate.remove(key);
            action.accept(previous);
            if (previous != null) {
                previous.getData().release();
            }
            return previous != null;
        }

        public boolean isEmpty() {
            return this.delegate.isEmpty();
        }

        public void clear() {
            Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = this.delegate.entrySet().iterator();
            while (it.hasNext()) {
                RAMQueueEntry re = it.next().getValue();
                it.remove();
                re.getData().release();
            }
        }
    }

    static class RAMQueueEntry {
        private final BlockCacheKey key;
        private final Cacheable data;
        private long accessCounter;
        private boolean inMemory;

        RAMQueueEntry(BlockCacheKey bck, Cacheable data2, long accessCounter, boolean inMemory) {
            this.key = bck;
            this.data = data2;
            this.accessCounter = accessCounter;
            this.inMemory = inMemory;
        }

        public Cacheable getData() {
            return this.data;
        }

        public BlockCacheKey getKey() {
            return this.key;
        }

        public void access(long accessCounter) {
            this.accessCounter = accessCounter;
        }

        private ByteBuffAllocator getByteBuffAllocator() {
            if (this.data instanceof HFileBlock) {
                return ((HFileBlock)this.data).getByteBuffAllocator();
            }
            return ByteBuffAllocator.HEAP;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public BucketEntry writeToCache(IOEngine ioEngine, BucketAllocator alloc, LongAdder realCacheSize, Function<BucketEntry, ByteBuffAllocator.Recycler> createRecycler) throws IOException {
            int len = this.data.getSerializedLength();
            if (len == 0) {
                return null;
            }
            long offset = alloc.allocateBlock(len);
            boolean succ = false;
            BucketEntry bucketEntry = null;
            try {
                bucketEntry = new BucketEntry(offset, len, this.accessCounter, this.inMemory, createRecycler, this.getByteBuffAllocator());
                bucketEntry.setDeserializerReference(this.data.getDeserializer());
                if (this.data instanceof HFileBlock) {
                    HFileBlock block = (HFileBlock)this.data;
                    ByteBuff sliceBuf = block.getBufferReadOnly();
                    ByteBuffer metadata2 = block.getMetaData();
                    ioEngine.write(sliceBuf, offset);
                    ioEngine.write(metadata2, offset + (long)len - (long)metadata2.limit());
                } else {
                    ByteBuffer bb = ByteBuffer.allocate(len);
                    this.data.serialize(bb, true);
                    ioEngine.write(bb, offset);
                }
                succ = true;
            }
            finally {
                if (!succ) {
                    alloc.freeBlock(offset);
                }
            }
            realCacheSize.add(len);
            return bucketEntry;
        }
    }

    private class BucketEntryGroup {
        private CachedEntryQueue queue;
        private long totalSize = 0L;
        private long bucketSize;

        public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
            this.bucketSize = bucketSize;
            this.queue = new CachedEntryQueue(bytesToFree, blockSize);
            this.totalSize = 0L;
        }

        public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
            this.totalSize += (long)block.getValue().getLength();
            this.queue.add(block);
        }

        public long free(long toFree) {
            Map.Entry<BlockCacheKey, BucketEntry> entry;
            long freedBytes = 0L;
            while ((entry = this.queue.pollLast()) != null) {
                BucketEntry be;
                BlockCacheKey blockCacheKey = entry.getKey();
                if (BucketCache.this.evictBucketEntryIfNoRpcReferenced(blockCacheKey, be = entry.getValue())) {
                    freedBytes += (long)be.getLength();
                }
                if (freedBytes < toFree) continue;
                return freedBytes;
            }
            return freedBytes;
        }

        public long overflow() {
            return this.totalSize - this.bucketSize;
        }

        public long totalSize() {
            return this.totalSize;
        }
    }

    class WriterThread
    extends Thread {
        private final BlockingQueue<RAMQueueEntry> inputQueue;
        private volatile boolean writerEnabled;

        WriterThread(BlockingQueue<RAMQueueEntry> queue) {
            super("BucketCacheWriterThread");
            this.writerEnabled = true;
            this.inputQueue = queue;
        }

        void disableWriter() {
            this.writerEnabled = false;
        }

        @Override
        public void run() {
            List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
            try {
                while (BucketCache.this.cacheEnabled && this.writerEnabled) {
                    try {
                        try {
                            entries = BucketCache.getRAMQueueEntries(this.inputQueue, entries);
                        }
                        catch (InterruptedException ie) {
                            if (!BucketCache.this.cacheEnabled || !this.writerEnabled) break;
                        }
                        BucketCache.this.doDrain(entries);
                    }
                    catch (Exception ioe) {
                        LOG.error("WriterThread encountered error", (Throwable)ioe);
                    }
                }
            }
            catch (Throwable t) {
                LOG.warn("Failed doing drain", t);
            }
            LOG.info(this.getName() + " exiting, cacheEnabled=" + BucketCache.this.cacheEnabled);
        }
    }

    private static class StatisticsThread
    extends Thread {
        private final BucketCache bucketCache;

        public StatisticsThread(BucketCache bucketCache) {
            super("BucketCacheStatsThread");
            this.setDaemon(true);
            this.bucketCache = bucketCache;
        }

        @Override
        public void run() {
            this.bucketCache.logStats();
        }
    }
}

