/*
 * Decompiled with CFR 0.152.
 */
package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.MutationType;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.ReadTransaction;
import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.record.FDBRecordStoreProperties;
import com.apple.foundationdb.record.RecordCoreArgumentException;
import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordCoreStorageException;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorContinuation;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.RecordCursorVisitor;
import com.apple.foundationdb.record.ScanProperties;
import com.apple.foundationdb.record.cursors.BaseCursor;
import com.apple.foundationdb.record.cursors.CursorLimitManager;
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.provider.foundationdb.FDBRawRecord;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordVersion;
import com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer;
import com.apple.foundationdb.record.provider.foundationdb.FDBStoredSizes;
import com.apple.foundationdb.record.provider.foundationdb.FDBTransactionContext;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.tuple.ByteArrayUtil2;
import com.apple.foundationdb.tuple.Tuple;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@API(value=API.Status.INTERNAL)
public class SplitHelper {
    @Nonnull
    private static final Logger LOGGER = LoggerFactory.getLogger(SplitHelper.class);
    public static final int SPLIT_RECORD_SIZE = 100000;
    public static final long RECORD_VERSION = -1L;
    public static final long UNSPLIT_RECORD = 0L;
    public static final long START_SPLIT_RECORD = 1L;

    private SplitHelper() {
    }

    public static void saveWithSplit(@Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, @Nonnull byte[] serialized, @Nullable FDBRecordVersion version) {
        SplitHelper.saveWithSplit(context, subspace, key, serialized, version, true, false, false, null, null);
    }

    public static void saveWithSplit(@Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, @Nonnull byte[] serialized, @Nullable FDBRecordVersion version, boolean splitLongRecords, boolean omitUnsplitSuffix, boolean clearBasedOnPreviousSizeInfo, @Nullable FDBStoredSizes previousSizeInfo, @Nullable SizeInfo sizeInfo) {
        if (omitUnsplitSuffix && version != null) {
            throw new RecordCoreArgumentException("Cannot include version in-line using old unsplit record format", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(subspace.pack())}).addLogInfo(new Object[]{LogMessageKeys.VERSION, version});
        }
        Transaction tr = context.ensureActive();
        if (serialized.length > 100000) {
            if (!splitLongRecords) {
                throw new RecordCoreException("Record is too long to be stored in a single value; consider split_long_records", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(subspace.pack())}).addLogInfo(new Object[]{LogMessageKeys.VALUE_SIZE, serialized.length});
            }
            SplitHelper.writeSplitRecord(context, subspace, key, serialized, clearBasedOnPreviousSizeInfo, previousSizeInfo, sizeInfo);
        } else {
            if (splitLongRecords || previousSizeInfo == null || previousSizeInfo.isVersionedInline()) {
                SplitHelper.clearPreviousSplitRecord(context, subspace, key, clearBasedOnPreviousSizeInfo, previousSizeInfo);
            }
            Tuple recordKey = splitLongRecords || !omitUnsplitSuffix ? key.add(0L) : key;
            byte[] keyBytes = subspace.pack(recordKey);
            tr.set(keyBytes, serialized);
            if (sizeInfo != null) {
                sizeInfo.set(keyBytes, serialized);
                sizeInfo.setSplit(false);
            }
        }
        SplitHelper.writeVersion(context, subspace, key, version, sizeInfo);
    }

    private static void writeSplitRecord(@Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, @Nonnull byte[] serialized, boolean clearBasedOnPreviousSizeInfo, @Nullable FDBStoredSizes previousSizeInfo, @Nullable SizeInfo sizeInfo) {
        Transaction tr = context.ensureActive();
        Subspace keySplitSubspace = subspace.subspace(key);
        SplitHelper.clearPreviousSplitRecord(context, subspace, key, clearBasedOnPreviousSizeInfo, previousSizeInfo);
        long index = 1L;
        int offset = 0;
        while (offset < serialized.length) {
            int nextOffset = offset + 100000;
            if (nextOffset > serialized.length) {
                nextOffset = serialized.length;
            }
            byte[] keyBytes = keySplitSubspace.pack(index);
            byte[] valueBytes = Arrays.copyOfRange(serialized, offset, nextOffset);
            tr.set(keyBytes, valueBytes);
            if (sizeInfo != null) {
                if (offset == 0) {
                    sizeInfo.set(keyBytes, valueBytes);
                    sizeInfo.setSplit(true);
                } else {
                    sizeInfo.add(keyBytes, valueBytes);
                }
            }
            ++index;
            offset = nextOffset;
        }
    }

    private static void writeVersion(@Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, @Nullable FDBRecordVersion version, @Nullable SizeInfo sizeInfo) {
        if (version == null) {
            if (sizeInfo != null) {
                sizeInfo.setVersionedInline(false);
            }
            return;
        }
        Transaction tr = context.ensureActive();
        byte[] keyBytes = subspace.pack(key.add(-1L));
        byte[] valueBytes = SplitHelper.packVersion(version);
        if (version.isComplete()) {
            tr.set(keyBytes, valueBytes);
        } else {
            context.addVersionMutation(MutationType.SET_VERSIONSTAMPED_VALUE, keyBytes, valueBytes);
            context.addToLocalVersionCache(keyBytes, version.getLocalVersion());
        }
        if (sizeInfo != null) {
            sizeInfo.setVersionedInline(true);
            sizeInfo.add(keyBytes, valueBytes);
            if (!version.isComplete()) {
                sizeInfo.valueSize -= 4;
            }
        }
    }

    public static void dryRunSaveWithSplitOnlySetSizeInfo(@Nonnull Subspace subspace, @Nonnull Tuple key, @Nonnull byte[] serialized, @Nullable FDBRecordVersion version, boolean splitLongRecords, boolean omitUnsplitSuffix, @Nullable SizeInfo sizeInfo) {
        if (serialized.length > 100000) {
            SplitHelper.dryRunWriteSplitRecordOnlySetSizeInfo(subspace, key, serialized, sizeInfo);
        } else {
            Tuple recordKey = splitLongRecords || !omitUnsplitSuffix ? key.add(0L) : key;
            byte[] keyBytes = subspace.pack(recordKey);
            if (sizeInfo != null) {
                sizeInfo.set(keyBytes, serialized);
                sizeInfo.setSplit(false);
            }
        }
        SplitHelper.dryRunWriteVersionSizeInfo(subspace, key, version, sizeInfo);
    }

    private static void dryRunWriteSplitRecordOnlySetSizeInfo(@Nonnull Subspace subspace, @Nonnull Tuple key, @Nonnull byte[] serialized, @Nullable SizeInfo sizeInfo) {
        Subspace keySplitSubspace = subspace.subspace(key);
        long index = 1L;
        int offset = 0;
        while (offset < serialized.length) {
            int nextOffset = offset + 100000;
            if (nextOffset > serialized.length) {
                nextOffset = serialized.length;
            }
            byte[] keyBytes = keySplitSubspace.pack(index);
            byte[] valueBytes = Arrays.copyOfRange(serialized, offset, nextOffset);
            if (sizeInfo != null) {
                if (offset == 0) {
                    sizeInfo.set(keyBytes, valueBytes);
                    sizeInfo.setSplit(true);
                } else {
                    sizeInfo.add(keyBytes, valueBytes);
                }
            }
            ++index;
            offset = nextOffset;
        }
    }

    private static void dryRunWriteVersionSizeInfo(@Nonnull Subspace subspace, @Nonnull Tuple key, @Nullable FDBRecordVersion version, @Nullable SizeInfo sizeInfo) {
        if (version == null) {
            if (sizeInfo != null) {
                sizeInfo.setVersionedInline(false);
            }
            return;
        }
        byte[] keyBytes = subspace.pack(key.add(-1L));
        byte[] valueBytes = SplitHelper.packVersion(version);
        if (sizeInfo != null) {
            sizeInfo.setVersionedInline(true);
            sizeInfo.add(keyBytes, valueBytes);
            if (!version.isComplete()) {
                sizeInfo.valueSize -= 4;
            }
        }
    }

    @Nonnull
    static byte[] packVersion(@Nonnull FDBRecordVersion version) {
        if (version.isComplete()) {
            return Tuple.from(version.toVersionstamp(false)).pack();
        }
        return Tuple.from(version.toVersionstamp(false)).packWithVersionstamp();
    }

    @Nullable
    static FDBRecordVersion unpackVersion(@Nullable byte[] packedVersion) {
        if (packedVersion != null) {
            return FDBRecordVersion.fromVersionstamp(Tuple.fromBytes(packedVersion).getVersionstamp(0), true);
        }
        return null;
    }

    private static void clearPreviousSplitRecord(@Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, boolean clearBasedOnPreviousSizeInfo, @Nullable FDBStoredSizes previousSizeInfo) {
        Transaction tr = context.ensureActive();
        Subspace keySplitSubspace = subspace.subspace(key);
        if (clearBasedOnPreviousSizeInfo) {
            if (previousSizeInfo != null) {
                if (Boolean.TRUE.equals(context.getPropertyStorage().getPropertyValue(FDBRecordStoreProperties.UNROLL_SINGLE_RECORD_DELETES))) {
                    Range keySplitSubspaceRange = keySplitSubspace.range();
                    tr.addWriteConflictRange(keySplitSubspaceRange.begin, keySplitSubspaceRange.end);
                    List<Long> offsets = SplitHelper.offsets(previousSizeInfo);
                    for (Long offset : offsets) {
                        tr.clear(keySplitSubspace.pack(offset));
                    }
                } else if (previousSizeInfo.isSplit() || previousSizeInfo.isVersionedInline()) {
                    tr.clear(keySplitSubspace.range());
                } else {
                    tr.clear(keySplitSubspace.pack(0L));
                }
            }
        } else {
            tr.clear(keySplitSubspace.range());
        }
        byte[] versionKey = keySplitSubspace.pack(-1L);
        context.getLocalVersion(versionKey).ifPresent(localVersion -> context.removeVersionMutation(versionKey));
    }

    public static CompletableFuture<FDBRawRecord> loadWithSplit(@Nonnull ReadTransaction tr, @Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, boolean splitLongRecords, boolean missingUnsplitRecordSuffix, @Nullable SizeInfo sizeInfo) {
        if (!splitLongRecords && missingUnsplitRecordSuffix) {
            return SplitHelper.loadUnsplitLegacy(tr, context, subspace, key, sizeInfo);
        }
        if (Boolean.TRUE.equals(context.getPropertyStorage().getPropertyValue(FDBRecordStoreProperties.LOAD_RECORDS_VIA_GETS))) {
            return SplitHelper.loadSplitViaGets(tr, context, subspace, key, sizeInfo);
        }
        long startTime = System.nanoTime();
        Subspace recordSubspace = subspace.subspace(key);
        Range recordRange = recordSubspace.range();
        AsyncIterable<KeyValue> rangeScan = SplitHelper.scanSingleRecord(tr, recordRange.begin, recordRange.end);
        Iterator rangeIter = rangeScan.iterator();
        context.instrument(FDBStoreTimer.DetailEvents.GET_RECORD_RANGE_RAW_FIRST_CHUNK, rangeIter.onHasNext(), startTime);
        return new SingleKeyUnsplitter(context, key, recordSubspace, (AsyncIterator<KeyValue>)rangeIter, sizeInfo).run(context.getExecutor());
    }

    private static CompletableFuture<FDBRawRecord> loadSplitViaGets(@Nonnull ReadTransaction tr, @Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, @Nullable SizeInfo sizeInfo) {
        SizeInfo storedSizes = sizeInfo == null ? new SizeInfo() : sizeInfo;
        storedSizes.reset();
        Subspace recordSubspace = subspace.subspace(key);
        byte[] versionKey = recordSubspace.pack(-1L);
        byte[] unsplitKey = recordSubspace.pack(0L);
        byte[] startSplitKey = recordSubspace.pack(1L);
        Range recordRange = recordSubspace.range();
        tr.addReadConflictRangeIfNotSnapshot(recordRange.begin, recordRange.end);
        CompletableFuture versionValueFuture = context.getLocalVersion(versionKey).map(FDBRecordVersion::incomplete).map(CompletableFuture::completedFuture).orElseGet(() -> tr.get(versionKey).thenApply(SplitHelper::unpackVersion));
        CompletableFuture<byte[]> unsplitValueFuture = tr.get(unsplitKey);
        CompletableFuture<byte[]> startSplitValueFuture = tr.get(startSplitKey);
        return ((CompletableFuture)versionValueFuture.thenCompose(version -> unsplitValueFuture.thenCombine(startSplitValueFuture, (unsplitValue, startSplitValue) -> {
            storedSizes.add(versionKey, (FDBRecordVersion)version);
            if (unsplitValue == null && startSplitValue == null) {
                if (version != null) {
                    throw new FoundSplitWithoutStartException(-1L, false).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(recordSubspace.pack())}).addLogInfo(new Object[]{LogMessageKeys.VERSION, version});
                }
                return CompletableFuture.completedFuture(null);
            }
            if (unsplitValue != null && startSplitValue != null) {
                throw new RecordCoreException("Unsplit value followed by split.", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(recordSubspace.pack())});
            }
            if (unsplitValue != null) {
                storedSizes.setSplit(false);
                storedSizes.add(unsplitKey, (byte[])unsplitValue);
                return CompletableFuture.completedFuture(new FDBRawRecord(key, (byte[])unsplitValue, (FDBRecordVersion)version, storedSizes));
            }
            storedSizes.setSplit(true);
            storedSizes.add(startSplitKey, (byte[])startSplitValue);
            AsyncIterable<KeyValue> iterable = SplitHelper.scanSingleRecord(tr, recordSubspace.pack(2L), recordRange.end);
            ArrayList<byte[]> values = new ArrayList<byte[]>();
            values.add((byte[])startSplitValue);
            AtomicLong lastSplit = new AtomicLong(1L);
            return AsyncUtil.forEach(iterable, keyValue -> {
                long expectedSplit;
                long splitPoint = recordSubspace.unpack(keyValue.getKey()).getLong(0);
                if (splitPoint != (expectedSplit = lastSplit.incrementAndGet())) {
                    throw new RecordCoreException("Split record segments out of order", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY, ByteArrayUtil2.loggable(keyValue.getKey())}).addLogInfo(new Object[]{LogMessageKeys.EXPECTED_INDEX, expectedSplit}).addLogInfo(new Object[]{LogMessageKeys.FOUND_INDEX, splitPoint}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(subspace.pack())});
                }
                storedSizes.add((KeyValue)keyValue);
                values.add(keyValue.getValue());
            }, context.getExecutor()).thenApply(vignore -> {
                byte[] rawRecord = ByteArrayUtil.join((byte[][])values.toArray((T[])new byte[0][0]));
                return new FDBRawRecord(key, rawRecord, (FDBRecordVersion)version, storedSizes);
            });
        }))).thenCompose(Function.identity());
    }

    private static AsyncIterable<KeyValue> scanSingleRecord(ReadTransaction tr, byte[] start, byte[] end) {
        return tr.getRange(start, end, 0, false, StreamingMode.WANT_ALL);
    }

    private static CompletableFuture<FDBRawRecord> loadUnsplitLegacy(@Nonnull ReadTransaction tr, @Nonnull FDBTransactionContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, @Nullable SizeInfo sizeInfo) {
        long startTime = System.nanoTime();
        byte[] keyBytes = subspace.pack(key);
        return tr.get(keyBytes).thenApply(valueBytes -> {
            if (context.getTimer() != null) {
                context.getTimer().recordSinceNanoTime(FDBStoreTimer.DetailEvents.GET_RECORD_RAW_VALUE, startTime);
            }
            if (valueBytes != null && sizeInfo != null) {
                sizeInfo.set(keyBytes, (byte[])valueBytes);
            }
            if (valueBytes != null) {
                if (sizeInfo != null) {
                    return new FDBRawRecord(key, (byte[])valueBytes, null, sizeInfo);
                }
                return new FDBRawRecord(key, (byte[])valueBytes, null, 1, keyBytes.length, ((byte[])valueBytes).length, false, false);
            }
            return null;
        });
    }

    public static CompletableFuture<Boolean> keyExists(@Nonnull ReadTransaction tr, @Nonnull FDBTransactionContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, boolean splitLongRecords, boolean missingUnsplitRecordSuffix) {
        if (!splitLongRecords && missingUnsplitRecordSuffix) {
            return SplitHelper.loadUnsplitLegacy(tr, context, subspace, key, null).thenApply(Objects::nonNull);
        }
        long startTime = System.nanoTime();
        byte[] keyBytes = subspace.pack(key);
        AsyncIterable<KeyValue> rangeScan = tr.getRange(Range.startsWith(keyBytes), 1);
        return context.instrument(FDBStoreTimer.DetailEvents.GET_RECORD_RANGE_RAW_FIRST_CHUNK, rangeScan.iterator().onHasNext(), startTime);
    }

    public static void deleteSplit(@Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull Tuple key, boolean splitLongRecords, boolean missingUnsplitRecordSuffix, boolean clearBasedOnPreviousSizeInfo, @Nullable FDBStoredSizes previousSizeInfo) {
        if (!splitLongRecords && missingUnsplitRecordSuffix) {
            context.ensureActive().clear(subspace.pack(key));
        } else {
            SplitHelper.clearPreviousSplitRecord(context, subspace, key, clearBasedOnPreviousSizeInfo, previousSizeInfo);
        }
    }

    public static Tuple unpackKey(@Nonnull Subspace subspace, @Nonnull KeyValue kv) {
        try {
            return subspace.unpack(kv.getKey());
        }
        catch (IllegalArgumentException e) {
            throw new RecordCoreArgumentException("unable to unpack key", e).addLogInfo(new Object[]{LogMessageKeys.KEY, ByteArrayUtil2.loggable(kv.getKey())}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(subspace.getKey())});
        }
    }

    public static void validatePrimaryKeySuffixNumber(Tuple keyTuple) {
        long nextIndex;
        try {
            nextIndex = keyTuple.getLong(keyTuple.size() - 1);
        }
        catch (Exception e) {
            throw new RecordCoreStorageException("Invalid record split number: not a number", e);
        }
        if (nextIndex != -1L && nextIndex != 0L && nextIndex < 1L) {
            throw new RecordCoreStorageException("Invalid record split number: ").addLogInfo(new Object[]{LogMessageKeys.SPLIT_NEXT_INDEX, nextIndex});
        }
    }

    private static List<Long> offsets(FDBStoredSizes sizeInfo) {
        ArrayList<Long> offsetList = new ArrayList<Long>(sizeInfo.getKeyCount());
        if (sizeInfo.isVersionedInline()) {
            offsetList.add(-1L);
        }
        if (sizeInfo.isSplit()) {
            long current = 1L;
            while (offsetList.size() < sizeInfo.getKeyCount()) {
                offsetList.add(current);
                ++current;
            }
        } else {
            offsetList.add(0L);
        }
        return offsetList;
    }

    public static class SizeInfo
    implements FDBStoredSizes {
        private int keyCount;
        private int keySize;
        private int valueSize;
        private boolean split;
        private boolean versionedInline;

        @Override
        public int getKeyCount() {
            return this.keyCount;
        }

        public void setKeyCount(int keyCount) {
            this.keyCount = keyCount;
        }

        @Override
        public int getKeySize() {
            return this.keySize;
        }

        public void setKeySize(int keySize) {
            this.keySize = keySize;
        }

        @Override
        public int getValueSize() {
            return this.valueSize;
        }

        public void setValueSize(int valueSize) {
            this.valueSize = valueSize;
        }

        @Override
        public boolean isSplit() {
            return this.split;
        }

        public void setSplit(boolean split) {
            this.split = split;
        }

        @Override
        public boolean isVersionedInline() {
            return this.versionedInline;
        }

        public void setVersionedInline(boolean versionedInline) {
            this.versionedInline = versionedInline;
        }

        public void set(@Nonnull KeyValue keyValue) {
            this.set(keyValue.getKey(), keyValue.getValue());
        }

        public void set(@Nonnull byte[] keyBytes, @Nonnull byte[] valueBytes) {
            this.keyCount = 1;
            this.keySize = keyBytes.length;
            this.valueSize = valueBytes.length;
        }

        public void add(@Nonnull KeyValue keyValue) {
            this.add(keyValue.getKey(), keyValue.getValue());
        }

        public void add(@Nonnull byte[] keyBytes, @Nonnull byte[] valueBytes) {
            ++this.keyCount;
            this.keySize += keyBytes.length;
            this.valueSize += valueBytes.length;
        }

        public void add(@Nonnull byte[] keyBytes, @Nullable FDBRecordVersion version) {
            if (version != null) {
                ++this.keyCount;
                this.keySize += keyBytes.length;
                this.valueSize += 13;
                this.versionedInline = true;
            }
        }

        public void add(@Nonnull FDBStoredSizes sizes) {
            this.keyCount += sizes.getKeyCount();
            this.keySize += sizes.getKeySize();
            this.valueSize += sizes.getValueSize();
        }

        public void reset() {
            this.keyCount = 0;
            this.keySize = 0;
            this.valueSize = 0;
            this.split = false;
            this.versionedInline = false;
        }
    }

    public static class SingleKeyUnsplitter {
        @Nonnull
        private final FDBRecordContext context;
        @Nonnull
        private final Tuple key;
        @Nonnull
        private final Subspace keySplitSubspace;
        @Nonnull
        private final SizeInfo sizeInfo;
        @Nonnull
        private final AsyncIterator<KeyValue> iter;
        private long lastIndex;
        @Nullable
        private byte[] result;
        @Nullable
        private FDBRecordVersion version;

        public SingleKeyUnsplitter(@Nonnull FDBRecordContext context, @Nonnull Tuple key, @Nonnull Subspace keySplitSubspace, @Nonnull AsyncIterator<KeyValue> iter, @Nullable SizeInfo sizeInfo) {
            this.context = context;
            this.key = key;
            this.keySplitSubspace = keySplitSubspace;
            this.iter = iter;
            this.sizeInfo = sizeInfo == null ? new SizeInfo() : sizeInfo;
        }

        @Nonnull
        public CompletableFuture<FDBRawRecord> run(Executor executor) {
            this.sizeInfo.reset();
            byte[] versionKey = this.keySplitSubspace.pack(-1L);
            this.context.getLocalVersion(versionKey).ifPresent(localVersion -> {
                this.version = FDBRecordVersion.incomplete(localVersion);
                this.sizeInfo.add(versionKey, this.version);
            });
            return AsyncUtil.whileTrue(() -> this.iter.onHasNext().thenApply(hasNext -> {
                if (hasNext.booleanValue()) {
                    this.append(this.iter.next());
                }
                return hasNext;
            }), executor).thenApply(vignore -> {
                if (this.result != null) {
                    return new FDBRawRecord(this.key, this.result, this.version, this.sizeInfo);
                }
                if (this.version != null) {
                    throw new FoundSplitWithoutStartException(-1L, false).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.keySplitSubspace.pack())}).addLogInfo(new Object[]{LogMessageKeys.VERSION, this.version});
                }
                return null;
            });
        }

        protected void append(@Nonnull KeyValue kv) {
            Tuple subkey = SplitHelper.unpackKey(this.keySplitSubspace, kv);
            if (subkey.size() != 1) {
                throw new RecordCoreException("Expected only a single key extension for split record.", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.keySplitSubspace.pack())});
            }
            long index = subkey.getLong(0);
            if (index == 0L) {
                if (this.result != null) {
                    throw new RecordCoreException("More than one unsplit value.", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.keySplitSubspace.pack())});
                }
                this.result = kv.getValue();
                this.sizeInfo.add(kv);
                this.sizeInfo.setSplit(false);
            } else if (index == this.lastIndex + 1L || this.lastIndex == -1L && index == 1L) {
                if (index == 1L) {
                    if (this.result != null) {
                        throw new RecordCoreException("Unsplit value followed by split.", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.keySplitSubspace.pack())});
                    }
                    this.result = kv.getValue();
                    this.sizeInfo.add(kv);
                    this.sizeInfo.setSplit(true);
                } else {
                    this.result = ByteArrayUtil.join(this.result, kv.getValue());
                    this.sizeInfo.add(kv);
                }
                this.lastIndex = index;
            } else if (index == -1L) {
                this.version = SplitHelper.unpackVersion(kv.getValue());
                this.sizeInfo.setVersionedInline(true);
                this.sizeInfo.add(kv);
            } else {
                if (this.lastIndex >= 1L) {
                    throw new FoundSplitOutOfOrderException(this.lastIndex + 1L, index).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.keySplitSubspace.pack())});
                }
                throw new FoundSplitWithoutStartException(index, false).addLogInfo(new Object[]{LogMessageKeys.KEY, ByteArrayUtil2.loggable(kv.getKey())}).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.key}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.keySplitSubspace.pack())});
            }
        }
    }

    public static class FoundSplitWithoutStartException
    extends RecordCoreException {
        public FoundSplitWithoutStartException(long nextIndex, boolean reverse) {
            super("Found split record without start", new Object[0]);
            this.addLogInfo(new Object[]{LogMessageKeys.SPLIT_NEXT_INDEX, nextIndex});
            this.addLogInfo(new Object[]{LogMessageKeys.SPLIT_REVERSE, reverse});
        }
    }

    public static class FoundSplitOutOfOrderException
    extends RecordCoreStorageException {
        public FoundSplitOutOfOrderException(long expected, long found) {
            super("Split record segments out of order");
            this.addLogInfo(new Object[]{LogMessageKeys.SPLIT_EXPECTED, expected});
            this.addLogInfo(new Object[]{LogMessageKeys.SPLIT_FOUND, found});
        }
    }

    public static class KeyValueUnsplitter
    implements BaseCursor<FDBRawRecord> {
        @Nonnull
        private final FDBRecordContext context;
        @Nonnull
        private final RecordCursor<KeyValue> inner;
        private final boolean oldVersionFormat;
        @Nonnull
        private final SizeInfo sizeInfo;
        private final boolean reverse;
        @Nonnull
        private final Subspace subspace;
        @Nullable
        private KeyValue next;
        @Nullable
        private Tuple nextKey;
        @Nullable
        private Subspace nextSubspace;
        @Nullable
        private FDBRecordVersion nextVersion;
        @Nullable
        private byte[] nextPrefix;
        private long nextIndex;
        @Nullable
        private RecordCursor.NoNextReason innerNoNextReason;
        @Nullable
        private RecordCursorResult<KeyValue> pending;
        @Nullable
        private RecordCursorContinuation continuation;
        @Nonnull
        private final CursorLimitManager limitManager;
        private long readLastKeyNanos = 0L;
        @Nullable
        private RecordCursorResult<FDBRawRecord> nextResult;

        public KeyValueUnsplitter(@Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull RecordCursor<KeyValue> inner, boolean oldVersionFormat, @Nullable SizeInfo sizeInfo, @Nonnull ScanProperties scanProperties) {
            this(context, subspace, inner, oldVersionFormat, sizeInfo, scanProperties.isReverse(), new CursorLimitManager(scanProperties));
        }

        public KeyValueUnsplitter(@Nonnull FDBRecordContext context, @Nonnull Subspace subspace, @Nonnull RecordCursor<KeyValue> inner, boolean oldVersionFormat, @Nullable SizeInfo sizeInfo, boolean reverse, @Nonnull CursorLimitManager limitManager) {
            this.context = context;
            this.subspace = subspace;
            this.inner = inner;
            this.oldVersionFormat = oldVersionFormat;
            this.sizeInfo = sizeInfo == null ? new SizeInfo() : sizeInfo;
            this.reverse = reverse;
            this.limitManager = limitManager;
        }

        @Override
        @Nonnull
        public CompletableFuture<RecordCursorResult<FDBRawRecord>> onNext() {
            if (this.nextResult != null && !this.nextResult.hasNext()) {
                return CompletableFuture.completedFuture(this.nextResult);
            }
            if (this.limitManager.isStopped()) {
                RecordCursor.NoNextReason noNextReason = this.mergeNoNextReason();
                this.nextResult = noNextReason.isSourceExhausted() ? RecordCursorResult.exhausted() : RecordCursorResult.withoutNextValue(this.continuation, this.mergeNoNextReason());
                return CompletableFuture.completedFuture(this.nextResult);
            }
            return this.appendUntilNewKey().thenApply(vignore -> {
                if (this.nextVersion != null && this.next == null) {
                    throw new FoundSplitWithoutStartException(-1L, this.reverse).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.nextKey}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.pack())}).addLogInfo(new Object[]{LogMessageKeys.VERSION, this.nextVersion});
                }
                if (!this.oldVersionFormat && this.nextKey != null) {
                    byte[] versionKey = this.subspace.subspace(this.nextKey).pack(-1L);
                    this.context.getLocalVersion(versionKey).ifPresent(localVersion -> {
                        this.nextVersion = FDBRecordVersion.incomplete(localVersion);
                        this.sizeInfo.setVersionedInline(true);
                        ++this.sizeInfo.keyCount;
                        this.sizeInfo.keySize += versionKey.length;
                        this.sizeInfo.valueSize += 13;
                    });
                }
                if (this.next == null) {
                    this.nextResult = RecordCursorResult.withoutNextValue(this.continuation, this.mergeNoNextReason());
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace(KeyValueLogMessage.of("unsplitter stopped", new Object[]{LogMessageKeys.NEXT_CONTINUATION, this.continuation == null ? "null" : ByteArrayUtil2.loggable(this.continuation.toBytes()), LogMessageKeys.NO_NEXT_REASON, this.nextResult.getNoNextReason(), LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.getKey())}));
                    }
                } else {
                    this.sizeInfo.setVersionedInline(this.nextVersion != null);
                    FDBRawRecord result = new FDBRawRecord(this.nextKey, this.next.getValue(), this.nextVersion, this.sizeInfo);
                    this.next = null;
                    this.nextKey = null;
                    this.nextVersion = null;
                    this.nextPrefix = null;
                    this.nextResult = RecordCursorResult.withNextValue(result, this.continuation);
                    if (LOGGER.isTraceEnabled()) {
                        KeyValueLogMessage msg = KeyValueLogMessage.build("unsplitter assembled new record", new Object[]{LogMessageKeys.NEXT_CONTINUATION, this.continuation == null ? "null" : ByteArrayUtil2.loggable(this.continuation.toBytes()), LogMessageKeys.KEY_TUPLE, result.getPrimaryKey(), LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.getKey())});
                        result.addSizeLogInfo(msg);
                        LOGGER.trace(msg.toString());
                    }
                }
                return this.nextResult;
            });
        }

        @Override
        @Nonnull
        public RecordCursorResult<FDBRawRecord> getNext() {
            return this.context.asyncToSync(FDBStoreTimer.Waits.WAIT_ADVANCE_CURSOR, this.onNext());
        }

        @Nonnull
        public RecordCursor.NoNextReason mergeNoNextReason() {
            if (this.innerNoNextReason == RecordCursor.NoNextReason.SOURCE_EXHAUSTED) {
                return this.innerNoNextReason;
            }
            return this.limitManager.getStoppedReason().orElse(this.innerNoNextReason);
        }

        @Override
        public void close() {
            this.inner.close();
        }

        @Override
        public boolean isClosed() {
            return this.inner.isClosed();
        }

        @Override
        @Nonnull
        public Executor getExecutor() {
            return this.inner.getExecutor();
        }

        @Override
        public boolean accept(@Nonnull RecordCursorVisitor visitor) {
            if (visitor.visitEnter(this)) {
                this.inner.accept(visitor);
            }
            return visitor.visitLeave(this);
        }

        private CompletableFuture<Void> appendUntilNewKey() {
            return AsyncUtil.whileTrue(() -> {
                if (this.pending != null) {
                    boolean complete = this.append(this.pending);
                    this.pending = null;
                    if (complete) {
                        return AsyncUtil.READY_FALSE;
                    }
                }
                return this.inner.onNext().thenApply(innerResult -> {
                    if (!innerResult.hasNext()) {
                        if (this.reverse && this.next != null && this.nextIndex != 1L && this.nextIndex != 0L && this.nextIndex != -1L) {
                            throw new FoundSplitWithoutStartException(this.nextIndex, true).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.nextKey}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.pack())});
                        }
                        this.innerNoNextReason = innerResult.getNoNextReason();
                        if (this.next == null) {
                            this.continuation = innerResult.getContinuation();
                        }
                        if (LOGGER.isTraceEnabled()) {
                            LOGGER.trace(KeyValueLogMessage.of("unsplitter inner cursor stopped", new Object[]{LogMessageKeys.NEXT_CONTINUATION, this.continuation == null ? "null" : ByteArrayUtil2.loggable(this.continuation.toBytes()), LogMessageKeys.NO_NEXT_REASON, this.innerNoNextReason, LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.getKey())}));
                        }
                        return false;
                    }
                    this.innerNoNextReason = null;
                    this.limitManager.tryRecordScan();
                    boolean complete = this.append((RecordCursorResult<KeyValue>)innerResult);
                    return !complete;
                });
            }, this.inner.getExecutor());
        }

        protected boolean append(@Nonnull RecordCursorResult<KeyValue> resultWithKv) {
            KeyValue kv = resultWithKv.get();
            this.limitManager.reportScannedBytes(kv.getKey().length + kv.getValue().length);
            if (this.nextPrefix == null) {
                this.continuation = resultWithKv.getContinuation();
                return this.appendFirst(kv);
            }
            if (ByteArrayUtil.startsWith(kv.getKey(), this.nextPrefix)) {
                this.continuation = resultWithKv.getContinuation();
                return this.appendNext(kv);
            }
            if (this.reverse && this.nextIndex != 0L && this.nextIndex != 1L && this.nextIndex != -1L) {
                throw new FoundSplitWithoutStartException(this.nextIndex, true).addLogInfo(new Object[]{LogMessageKeys.KEY, ByteArrayUtil2.loggable(kv.getKey())}).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.nextKey}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.pack())});
            }
            this.pending = resultWithKv;
            this.logEndFound();
            return true;
        }

        private boolean appendFirst(@Nonnull KeyValue kv) {
            boolean done;
            Tuple keyTuple = this.subspace.unpack(kv.getKey());
            this.nextKey = keyTuple.popBack();
            this.nextSubspace = this.subspace.subspace(this.nextKey);
            this.nextPrefix = this.nextSubspace.pack();
            this.next = new KeyValue(this.nextPrefix, kv.getValue());
            this.nextIndex = keyTuple.getLong(keyTuple.size() - 1);
            this.sizeInfo.set(kv);
            if (this.nextIndex == 0L) {
                this.sizeInfo.setSplit(false);
                done = !this.reverse;
            } else if (!this.reverse && this.nextIndex == -1L) {
                if (this.oldVersionFormat) {
                    throw new RecordCoreException("Found record version when old format specified", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY, ByteArrayUtil2.loggable(kv.getKey())}).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, keyTuple});
                }
                this.sizeInfo.setVersionedInline(true);
                this.nextVersion = SplitHelper.unpackVersion(kv.getValue());
                this.next = null;
                done = false;
            } else if (this.reverse && this.nextIndex != -1L || this.nextIndex == 1L) {
                this.sizeInfo.setSplit(true);
                done = false;
            } else {
                throw new FoundSplitWithoutStartException(this.nextIndex, this.reverse).addLogInfo(new Object[]{LogMessageKeys.KEY, ByteArrayUtil2.loggable(kv.getKey())}).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, keyTuple});
            }
            this.logFirstKey(done);
            return done;
        }

        private boolean appendNext(@Nonnull KeyValue kv) {
            boolean done;
            long index = this.nextSubspace.unpack(kv.getKey()).getLong(0);
            this.sizeInfo.add(kv);
            if (!(this.reverse || this.nextIndex != -1L || index != 0L && index != 1L)) {
                this.next = new KeyValue(this.nextPrefix, kv.getValue());
                this.nextIndex = index;
                this.sizeInfo.setSplit(index == 1L);
                done = this.nextIndex == 0L;
            } else if (!this.reverse && index == this.nextIndex + 1L) {
                this.next = new KeyValue(this.nextPrefix, ByteArrayUtil.join(this.next.getValue(), kv.getValue()));
                this.nextIndex = index;
                done = false;
            } else if (this.reverse && index == -1L && (this.nextIndex == 1L || this.nextIndex == 0L)) {
                if (this.oldVersionFormat) {
                    throw new RecordCoreException("Found record version when old format specified", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY, ByteArrayUtil2.loggable(kv.getKey())}).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.nextKey}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.pack())});
                }
                this.nextVersion = SplitHelper.unpackVersion(kv.getValue());
                this.nextIndex = index;
                done = true;
            } else if (this.reverse && index == this.nextIndex - 1L && index != -1L) {
                this.next = new KeyValue(this.nextPrefix, ByteArrayUtil.join(kv.getValue(), this.next.getValue()));
                this.nextIndex = index;
                done = false;
            } else {
                long expectedIndex = this.nextIndex + (long)(this.reverse ? -1 : 1);
                if (this.reverse && expectedIndex == 1L || !this.reverse && this.nextIndex == -1L) {
                    throw new FoundSplitWithoutStartException(index, this.reverse).addLogInfo(new Object[]{LogMessageKeys.KEY, ByteArrayUtil2.loggable(kv.getKey())}).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.nextKey}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.pack())});
                }
                throw new RecordCoreException("Split record segments out of order", new Object[0]).addLogInfo(new Object[]{LogMessageKeys.KEY, ByteArrayUtil2.loggable(kv.getKey())}).addLogInfo(new Object[]{LogMessageKeys.KEY_TUPLE, this.nextKey}).addLogInfo(new Object[]{LogMessageKeys.EXPECTED_INDEX, this.nextIndex + (long)(this.reverse ? -1 : 1)}).addLogInfo(new Object[]{LogMessageKeys.FOUND_INDEX, index}).addLogInfo(new Object[]{LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.pack())});
            }
            this.logNextKey(done);
            return done;
        }

        private void logFirstKey(boolean done) {
            this.logKey("found first key in new split record", done);
        }

        private void logNextKey(boolean done) {
            this.logKey("found next key in split record", done);
        }

        private void logEndFound() {
            this.logKey("end key found for split record", true);
        }

        private void logKey(@Nonnull String staticMessage, boolean done) {
            if (LOGGER.isTraceEnabled()) {
                KeyValueLogMessage msg = KeyValueLogMessage.build(staticMessage, new Object[]{LogMessageKeys.KEY_TUPLE, this.nextKey, LogMessageKeys.SPLIT_REVERSE, this.reverse, LogMessageKeys.SPLIT_NEXT_INDEX, this.nextIndex, LogMessageKeys.KNOWN_LAST_KEY, done, LogMessageKeys.SUBSPACE, ByteArrayUtil2.loggable(this.subspace.getKey())});
                this.sizeInfo.addSizeLogInfo(msg);
                long currentNanos = System.nanoTime();
                if (this.readLastKeyNanos != 0L) {
                    msg.addKeyAndValue((Object)LogMessageKeys.READ_LAST_KEY_MICROS, TimeUnit.NANOSECONDS.toMicros(currentNanos - this.readLastKeyNanos));
                }
                this.readLastKeyNanos = currentNanos;
                LOGGER.trace(msg.toString());
            }
        }
    }
}

