/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.kv;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.com.google.common.io.MoreFiles;
import org.apache.pulsar.shade.com.google.common.io.RecursiveDeleteOption;
import org.apache.pulsar.shade.com.google.common.primitives.SignedBytes;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.coder.Coder;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.kv.KV;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.kv.KVImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.InvalidStateStoreException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVIterator;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVMulti;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.kv.KVStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.Bytes;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.CheckpointInfo;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.RocksCheckpointer;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.rocksdb.BlockBasedTableConfig;
import org.apache.pulsar.shade.org.rocksdb.ColumnFamilyDescriptor;
import org.apache.pulsar.shade.org.rocksdb.ColumnFamilyHandle;
import org.apache.pulsar.shade.org.rocksdb.ColumnFamilyOptions;
import org.apache.pulsar.shade.org.rocksdb.DBOptions;
import org.apache.pulsar.shade.org.rocksdb.FlushOptions;
import org.apache.pulsar.shade.org.rocksdb.LRUCache;
import org.apache.pulsar.shade.org.rocksdb.Options;
import org.apache.pulsar.shade.org.rocksdb.RocksDB;
import org.apache.pulsar.shade.org.rocksdb.RocksDBException;
import org.apache.pulsar.shade.org.rocksdb.RocksIterator;
import org.apache.pulsar.shade.org.rocksdb.TtlDB;
import org.apache.pulsar.shade.org.rocksdb.WriteBatch;
import org.apache.pulsar.shade.org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksdbKVStore<K, V>
implements KVStore<K, V> {
    private static final Logger log = LoggerFactory.getLogger(RocksdbKVStore.class);
    private static final byte[] METADATA_CF = ".meta".getBytes(StandardCharsets.UTF_8);
    private static final byte[] DATA_CF = "default".getBytes(StandardCharsets.UTF_8);
    private static final byte[] DATA_TTL_CF = "default_ttl".getBytes(StandardCharsets.UTF_8);
    private static final byte[] LAST_REVISION = ".lrev".getBytes(StandardCharsets.UTF_8);
    private static final AtomicLongFieldUpdater<RocksdbKVStore> lastRevisionUpdater = AtomicLongFieldUpdater.newUpdater(RocksdbKVStore.class, "lastRevision");
    protected String name;
    protected int ttlSeconds;
    protected Coder<K> keyCoder;
    protected Coder<V> valCoder;
    protected File dbDir;
    protected RocksDB db;
    protected ColumnFamilyHandle metaCfHandle;
    protected ColumnFamilyHandle dataCfHandle;
    protected final Set<KVIterator<K, V>> kvIters;
    protected DBOptions dbOpts;
    protected ColumnFamilyOptions cfOpts;
    protected WriteOptions writeOpts;
    protected FlushOptions flushOpts;
    protected volatile boolean isInitialized = false;
    protected volatile boolean closed = false;
    protected volatile long lastRevision = -1L;
    private final byte[] lastRevisionBytes = new byte[8];
    private CheckpointStore checkpointStore;
    private ScheduledExecutorService checkpointScheduler;
    private RocksCheckpointer checkpointer;
    private boolean cleanupLocalStoreDirEnable;

    public RocksdbKVStore() {
        this.kvIters = Collections.synchronizedSet(Sets.newHashSet());
    }

    protected void checkStoreOpen() {
        if (this.closed) {
            throw new InvalidStateStoreException("State store " + this.name + " is already closed");
        }
        if (!this.isInitialized) {
            throw new InvalidStateStoreException("State Store " + this.name + " is not initialized yet");
        }
    }

    @VisibleForTesting
    public synchronized RocksDB getDb() {
        return this.db;
    }

    @Override
    public synchronized String name() {
        return this.name;
    }

    private void loadRocksdbFromCheckpointStore(StateStoreSpec spec) throws StateStoreException {
        Preconditions.checkNotNull(spec.getCheckpointIOScheduler(), "checkpoint io scheduler is not configured");
        Preconditions.checkNotNull(spec.getCheckpointDuration(), "checkpoint duration is not configured");
        String dbName = spec.getName();
        File localStorePath = spec.getLocalStateStoreDir();
        List<CheckpointInfo> checkpoints = RocksCheckpointer.getCheckpoints(dbName, spec.getCheckpointStore());
        for (CheckpointInfo cpi : checkpoints) {
            try {
                cpi.restore(dbName, localStorePath, spec.getCheckpointStore(), spec.getCheckpointRestoreIdleLimit());
                this.openRocksdb(spec);
                checkpoints.stream().filter(cp -> cp != cpi).forEach(cp -> cp.remove(localStorePath));
                break;
            }
            catch (TimeoutException e) {
                log.error("Timeout waiting for checkpoint restore: {}", (Object)cpi, (Object)e);
                throw new StateStoreException("Failed to restore checkpoint: " + cpi.getId(), e);
            }
            catch (StateStoreException e) {
                log.error("Failed to restore checkpoint: {}", (Object)cpi, (Object)e);
            }
        }
    }

    @Override
    public synchronized void checkpoint() {
        log.info("Checkpoint local state store {} at revision {}", (Object)this.name, (Object)this.getLastRevision());
        byte[] checkpointAtRevisionBytes = new byte[8];
        System.arraycopy(this.lastRevisionBytes, 0, checkpointAtRevisionBytes, 0, checkpointAtRevisionBytes.length);
        this.checkpointScheduler.submit(() -> {
            try {
                this.checkpointer.checkpointAtTxid(checkpointAtRevisionBytes);
            }
            catch (StateStoreException e) {
                log.error("Failed to checkpoint state store {} at revision {}", new Object[]{this.name, Bytes.toLong(checkpointAtRevisionBytes, 0), e});
            }
        });
    }

    private void readLastRevision() throws StateStoreException {
        byte[] revisionBytes;
        try {
            revisionBytes = this.db.get(this.metaCfHandle, LAST_REVISION);
        }
        catch (RocksDBException e) {
            throw new StateStoreException("Failed to read last revision from state store " + this.name(), e);
        }
        if (null == revisionBytes) {
            return;
        }
        long revision = Bytes.toLong(revisionBytes, 0);
        lastRevisionUpdater.set(this, revision);
    }

    @Override
    public long getLastRevision() {
        return lastRevisionUpdater.get(this);
    }

    private void setLastRevision(long lastRevision) {
        lastRevisionUpdater.set(this, lastRevision);
        Bytes.toBytes(lastRevision, this.lastRevisionBytes, 0);
    }

    private void updateLastRevision(long revision) {
        if (revision >= 0L) {
            if (this.getLastRevision() >= revision) {
                return;
            }
            this.setLastRevision(revision);
        }
    }

    protected void updateLastRevision(WriteBatch batch, long revision) {
        if (revision >= 0L) {
            if (this.getLastRevision() >= revision) {
                return;
            }
            try {
                this.setLastRevision(revision);
                batch.put(this.metaCfHandle, LAST_REVISION, this.lastRevisionBytes);
            }
            catch (RocksDBException e) {
                throw new StateStoreRuntimeException("Error while updating last revision " + revision + " from store " + this.name, e);
            }
        }
    }

    @Override
    public synchronized void init(StateStoreSpec spec) throws StateStoreException {
        Preconditions.checkNotNull(spec.getLocalStateStoreDir(), "local state store directory is not configured");
        this.name = spec.getName();
        this.cleanupLocalStoreDirEnable = spec.isLocalStorageCleanupEnable();
        this.ttlSeconds = spec.getTtlSeconds();
        this.keyCoder = spec.getKeyCoder();
        this.valCoder = spec.getValCoder();
        this.cleanupLocalStoreDir(spec.getLocalStateStoreDir());
        this.checkpointStore = spec.getCheckpointStore();
        if (null != this.checkpointStore) {
            this.loadRocksdbFromCheckpointStore(spec);
        } else {
            this.openRocksdb(spec);
        }
        this.readLastRevision();
        if (null != this.checkpointStore) {
            this.checkpointer = new RocksCheckpointer(this.name(), this.dbDir, this.db, this.checkpointStore, true, true, spec.isCheckpointChecksumEnable(), spec.isCheckpointChecksumCompatible());
            this.checkpointScheduler = spec.getCheckpointIOScheduler();
        }
        this.isInitialized = true;
    }

    protected void openRocksdb(StateStoreSpec spec) throws StateStoreException {
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        LRUCache cache = new LRUCache(0x4000000L);
        tableConfig.setBlockCache(cache);
        tableConfig.setBlockSize(4096L);
        tableConfig.setChecksumType(RocksConstants.DEFAULT_CHECKSUM_TYPE);
        this.dbOpts = new DBOptions();
        this.dbOpts.setCreateIfMissing(true);
        this.dbOpts.setErrorIfExists(false);
        this.dbOpts.setInfoLogLevel(RocksConstants.DEFAULT_LOG_LEVEL);
        this.dbOpts.setIncreaseParallelism(RocksConstants.DEFAULT_PARALLELISM);
        this.dbOpts.setCreateMissingColumnFamilies(true);
        this.cfOpts = new ColumnFamilyOptions();
        this.cfOpts.setTableFormatConfig(tableConfig);
        this.cfOpts.setWriteBufferSize(0x2000000L);
        this.cfOpts.setCompressionType(RocksConstants.DEFAULT_COMPRESSION_TYPE);
        this.cfOpts.setCompactionStyle(RocksConstants.DEFAULT_COMPACTION_STYLE);
        this.cfOpts.setMaxWriteBufferNumber(3);
        this.writeOpts = new WriteOptions();
        this.writeOpts.setDisableWAL(true);
        this.flushOpts = new FlushOptions();
        this.flushOpts.setWaitForFlush(true);
        this.dbDir = spec.getLocalStateStoreDir();
        Pair<RocksDB, List<ColumnFamilyHandle>> dbPair = this.openLocalDB(this.dbDir, this.dbOpts, this.cfOpts);
        this.db = dbPair.getLeft();
        this.metaCfHandle = dbPair.getRight().get(0);
        this.dataCfHandle = dbPair.getRight().get(1);
    }

    protected Pair<RocksDB, List<ColumnFamilyHandle>> openLocalDB(File dir, DBOptions options, ColumnFamilyOptions cfOpts) throws StateStoreException {
        return this.openRocksdb(dir, options, cfOpts);
    }

    protected Pair<RocksDB, List<ColumnFamilyHandle>> openRocksdb(File dir, DBOptions options, ColumnFamilyOptions cfOpts) throws StateStoreException {
        boolean haveTtl = this.ttlSeconds != 0;
        ColumnFamilyDescriptor metaDesc = new ColumnFamilyDescriptor(METADATA_CF, cfOpts);
        ColumnFamilyDescriptor dataDesc = new ColumnFamilyDescriptor(haveTtl ? DATA_TTL_CF : DATA_CF, cfOpts);
        try {
            Object checkpointPath;
            Files.createDirectories(dir.toPath(), new FileAttribute[0]);
            File dbDir = new File(dir, "current");
            boolean dbExists = dbDir.exists();
            if (!dbExists) {
                String uuid = UUID.randomUUID().toString();
                checkpointPath = Paths.get(dir.getAbsolutePath(), "checkpoints", uuid);
                Files.createDirectories((Path)checkpointPath, new FileAttribute[0]);
                Files.createSymbolicLink(Paths.get(dbDir.getAbsolutePath(), new String[0]), (Path)checkpointPath, new FileAttribute[0]);
            } else {
                Options opts = new Options(options, cfOpts);
                checkpointPath = null;
                try {
                    byte[] wanted = haveTtl ? DATA_TTL_CF : DATA_CF;
                    byte[] other = haveTtl ? DATA_CF : DATA_TTL_CF;
                    List<byte[]> cfNames = RocksDB.listColumnFamilies(opts, dbDir.getAbsolutePath());
                    if (!cfNames.contains(wanted) && cfNames.contains(other)) {
                        throw new StateStoreException(String.format("{}: expected {} column family, found {}", dbDir.getAbsolutePath(), wanted, other));
                    }
                }
                catch (Throwable throwable) {
                    checkpointPath = throwable;
                    throw throwable;
                }
                finally {
                    if (opts != null) {
                        if (checkpointPath != null) {
                            try {
                                opts.close();
                            }
                            catch (Throwable throwable) {
                                ((Throwable)checkpointPath).addSuppressed(throwable);
                            }
                        } else {
                            opts.close();
                        }
                    }
                }
            }
            ArrayList<ColumnFamilyHandle> cfHandles = Lists.newArrayListWithExpectedSize(2);
            RocksDB db = haveTtl ? TtlDB.open(options, dbDir.getAbsolutePath(), Lists.newArrayList(metaDesc, dataDesc), cfHandles, Lists.newArrayList(0, this.ttlSeconds), false) : RocksDB.open(options, dbDir.getAbsolutePath(), Lists.newArrayList(metaDesc, dataDesc), cfHandles);
            return Pair.of(db, cfHandles);
        }
        catch (IOException ioe) {
            log.error("Failed to create parent directory {} for opening rocksdb", (Object)dir.getParentFile().toPath(), (Object)ioe);
            throw new StateStoreException(ioe);
        }
        catch (RocksDBException dbe) {
            log.error("Failed to open rocksdb at dir {}", (Object)dir.getAbsolutePath(), (Object)dbe);
            throw new StateStoreException(dbe);
        }
    }

    @Override
    public synchronized void flush() throws StateStoreException {
        if (null == this.db) {
            return;
        }
        try {
            this.db.flush(this.flushOpts);
        }
        catch (RocksDBException e) {
            throw new StateStoreException("Exception on flushing rocksdb from store " + this.name, e);
        }
    }

    @Override
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (null != this.checkpointer) {
            this.checkpointer.close();
        }
        this.closeIters();
        this.closeLocalDB();
        RocksUtils.close(this.dbOpts);
        RocksUtils.close(this.writeOpts);
        RocksUtils.close(this.flushOpts);
        RocksUtils.close(this.cfOpts);
        this.cleanupLocalStoreDir(this.dbDir);
    }

    private void cleanupLocalStoreDir(File dbDir) {
        if (this.cleanupLocalStoreDirEnable && dbDir.exists()) {
            try {
                MoreFiles.deleteRecursively(dbDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
            }
            catch (IOException e) {
                log.error("Failed to cleanup localStoreDir", (Throwable)e);
            }
        }
    }

    protected void closeLocalDB() {
        try {
            this.flush();
        }
        catch (StateStoreException stateStoreException) {
            // empty catch block
        }
        RocksUtils.close(this.metaCfHandle);
        RocksUtils.close(this.dataCfHandle);
        RocksUtils.close(this.db);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeIters() {
        HashSet<KVIterator<K, V>> iterators;
        Set<KVIterator<K, V>> set = this.kvIters;
        synchronized (set) {
            iterators = Sets.newHashSet(this.kvIters);
        }
        iterators.forEach(KVIterator::close);
    }

    @Override
    public synchronized V get(K key) {
        Preconditions.checkNotNull(key, "key cannot be null");
        this.checkStoreOpen();
        byte[] keyBytes = this.keyCoder.encode(key);
        return this.getRaw(key, keyBytes);
    }

    private V getRaw(K key, byte[] keyBytes) {
        byte[] valBytes = this.getRawBytes(key, keyBytes);
        if (null == valBytes) {
            return null;
        }
        return this.valCoder.decode(valBytes);
    }

    protected byte[] getRawBytes(K key, byte[] keyBytes) {
        try {
            return this.db.get(this.dataCfHandle, keyBytes);
        }
        catch (RocksDBException e) {
            throw new StateStoreRuntimeException("Error while getting value for key " + key + " from store " + this.name, e);
        }
    }

    @Override
    public synchronized KVIterator<K, V> range(K from, K to) {
        this.checkStoreOpen();
        RocksIterator rocksIter = this.db.newIterator(this.dataCfHandle);
        if (null == from) {
            rocksIter.seekToFirst();
        } else {
            byte[] fromBytes = this.keyCoder.encode(from);
            rocksIter.seek(fromBytes);
        }
        RocksdbKVIterator kvIter = null == to ? new RocksdbKVIterator(this.name, rocksIter, this.keyCoder, this.valCoder) : new RocksdbRangeIterator(this.name, rocksIter, this.keyCoder, this.valCoder, to);
        this.kvIters.add(kvIter);
        return kvIter;
    }

    @Override
    public synchronized void put(K key, V value) {
        this.put(key, value, -1L);
    }

    synchronized void put(K key, V value, long revision) {
        Preconditions.checkNotNull(key, "key cannot be null");
        this.checkStoreOpen();
        this.updateLastRevision(revision);
        byte[] keyBytes = this.keyCoder.encode(key);
        this.putRaw(key, keyBytes, value, revision);
    }

    private void putRaw(K key, byte[] keyBytes, V value, long revision) {
        try {
            WriteBatch batch = new WriteBatch();
            if (revision > 0L) {
                batch.put(this.metaCfHandle, LAST_REVISION, this.lastRevisionBytes);
            }
            if (null == value) {
                batch.delete(this.dataCfHandle, keyBytes);
            } else {
                byte[] valBytes = this.valCoder.encode(value);
                batch.put(this.dataCfHandle, keyBytes, valBytes);
            }
            this.db.write(this.writeOpts, batch);
        }
        catch (RocksDBException e) {
            throw new StateStoreRuntimeException("Error while updating key " + key + " to value " + value + " from store " + this.name, e);
        }
    }

    @Override
    public V putIfAbsent(K key, V value) {
        return this.putIfAbsent(key, value, -1L);
    }

    synchronized V putIfAbsent(K key, V value, long revision) {
        Preconditions.checkNotNull(key, "key cannot be null");
        this.checkStoreOpen();
        this.updateLastRevision(revision);
        byte[] keyBytes = this.keyCoder.encode(key);
        V oldVal = this.getRaw(key, keyBytes);
        if (null != oldVal) {
            return oldVal;
        }
        if (value == null) {
            return null;
        }
        this.putRaw(key, keyBytes, value, revision);
        return null;
    }

    @Override
    public synchronized KVMulti<K, V> multi() {
        this.checkStoreOpen();
        return new KVMultiImpl();
    }

    @Override
    public synchronized V delete(K key) {
        return this.delete(key, -1L);
    }

    synchronized V delete(K key, long revision) {
        Preconditions.checkNotNull(key, "key cannot be null");
        this.checkStoreOpen();
        this.updateLastRevision(revision);
        byte[] keyBytes = this.keyCoder.encode(key);
        V val = this.getRaw(key, keyBytes);
        this.putRaw(key, keyBytes, null, revision);
        return val;
    }

    static {
        RocksDB.loadLibrary();
    }

    class RocksdbRangeIterator
    extends RocksdbKVIterator {
        private final Comparator<byte[]> comparator;
        private final byte[] endKeyBytes;

        private RocksdbRangeIterator(String name, RocksIterator iterator, Coder<K> keyCoder, Coder<V> valCoder, K endKey) {
            super(name, iterator, keyCoder, valCoder);
            this.comparator = SignedBytes.lexicographicalComparator();
            Preconditions.checkNotNull(endKey, "End key cannot be null");
            this.endKeyBytes = keyCoder.encode(endKey);
        }

        @Override
        public boolean hasNext() {
            return super.hasNext() && this.comparator.compare(this.iterator.key(), this.endKeyBytes) <= 0;
        }
    }

    class RocksdbKVIterator
    implements KVIterator<K, V> {
        final String name;
        final RocksIterator iterator;
        final Coder<K> keyCoder;
        final Coder<V> valCoder;
        private volatile boolean closed = false;

        private void ensureIteratorOpen() {
            if (this.closed) {
                throw new InvalidStateStoreException("Rocksdb state store " + this.name + " is already closed");
            }
        }

        @Override
        public void close() {
            RocksdbKVStore.this.kvIters.remove(this);
            this.iterator.close();
            this.closed = true;
        }

        @Override
        public boolean hasNext() {
            this.ensureIteratorOpen();
            return this.iterator.isValid();
        }

        private KV<K, V> getKvPair() {
            return new KVImpl(this.keyCoder.decode(this.iterator.key()), this.valCoder.decode(this.iterator.value()));
        }

        @Override
        public KV<K, V> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KV kv = this.getKvPair();
            this.iterator.next();
            return kv;
        }

        private RocksdbKVIterator(String name, RocksIterator iterator, Coder<K> keyCoder, Coder<V> valCoder) {
            this.name = name;
            this.iterator = iterator;
            this.keyCoder = keyCoder;
            this.valCoder = valCoder;
        }
    }

    class KVMultiImpl
    implements KVMulti<K, V> {
        private final WriteBatch batch = new WriteBatch();
        private volatile boolean executed = false;

        KVMultiImpl() {
        }

        private void checkExecuted() {
            if (this.executed) {
                throw new StateStoreRuntimeException("KVMulti#execute() has been called");
            }
        }

        @Override
        public void put(K key, V value) {
            Preconditions.checkNotNull(key, "key cannot be null");
            this.checkExecuted();
            byte[] keyBytes = RocksdbKVStore.this.keyCoder.encode(key);
            if (null == value) {
                this.deleteRaw(keyBytes);
            } else {
                this.putRaw(keyBytes, value);
            }
        }

        private void putRaw(byte[] keyBytes, V value) {
            try {
                this.batch.put(RocksdbKVStore.this.dataCfHandle, keyBytes, RocksdbKVStore.this.valCoder.encode(value));
            }
            catch (RocksDBException e) {
                throw new StateStoreRuntimeException(e);
            }
        }

        @Override
        public void delete(K key) {
            Preconditions.checkNotNull(key, "key cannot be null");
            this.checkExecuted();
            byte[] keyBytes = RocksdbKVStore.this.keyCoder.encode(key);
            this.deleteRaw(keyBytes);
        }

        private void deleteRaw(byte[] keyBytes) {
            try {
                this.batch.delete(RocksdbKVStore.this.dataCfHandle, keyBytes);
            }
            catch (RocksDBException e) {
                throw new StateStoreRuntimeException(e);
            }
        }

        @Override
        public void deleteRange(K from, K to) {
            Preconditions.checkNotNull(from, "from key cannot be null");
            Preconditions.checkNotNull(to, "to key cannot be null");
            this.checkExecuted();
            byte[] fromBytes = RocksdbKVStore.this.keyCoder.encode(from);
            byte[] toBytes = RocksdbKVStore.this.keyCoder.encode(to);
            try {
                this.batch.deleteRange(RocksdbKVStore.this.dataCfHandle, fromBytes, toBytes);
            }
            catch (RocksDBException e) {
                throw new StateStoreRuntimeException(e);
            }
        }

        @Override
        public synchronized void execute() {
            if (this.executed) {
                return;
            }
            RocksdbKVStore.this.checkStoreOpen();
            this.executed = true;
            try {
                RocksdbKVStore.this.getDb().write(RocksdbKVStore.this.writeOpts, this.batch);
            }
            catch (RocksDBException e) {
                throw new StateStoreRuntimeException("Error while executing a multi operation from store " + RocksdbKVStore.this.name, e);
            }
            finally {
                RocksUtils.close(this.batch);
            }
        }
    }
}

