/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.internals.Segments;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WindowStoreUtils;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class RocksDBWindowStoreTest {
    private static final long DEFAULT_CACHE_SIZE_BYTES = 0x100000L;
    private final int numSegments = 3;
    private static final long WINDOW_SIZE = 3L;
    private final String windowName = "window";
    private final long segmentSize = 60000L;
    private final long retentionPeriod = 120000L;
    private final Segments segments = new Segments("window", 120000L, 3);
    private final StateSerdes<Integer, String> serdes = new StateSerdes<Integer, String>("", Serdes.Integer(), Serdes.String());
    private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
    private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "), 0x100000L, new MockStreamsMetrics(new Metrics()));
    private final Producer<byte[], byte[]> producer = new MockProducer<byte[], byte[]>(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
    private final RecordCollector recordCollector = new RecordCollectorImpl(this.producer, "RocksDBWindowStoreTestTask", new LogContext("RocksDBWindowStoreTestTask "), new DefaultProductionExceptionHandler()){

        public <K1, V1> void send(String topic, K1 key, V1 value, Integer partition, Long timestamp, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
            RocksDBWindowStoreTest.this.changeLog.add(new KeyValue<byte[], byte[]>(keySerializer.serialize(topic, key), valueSerializer.serialize(topic, value)));
        }
    };
    private final File baseDir = TestUtils.tempDirectory((String)"test");
    private final MockProcessorContext context = new MockProcessorContext(this.baseDir, Serdes.ByteArray(), Serdes.ByteArray(), this.recordCollector, this.cache);
    private WindowStore windowStore;

    private <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, boolean enableCaching, boolean retainDuplicates) {
        RocksDBWindowStoreSupplier<Integer, String> supplier = new RocksDBWindowStoreSupplier<Integer, String>("window", 120000L, 3, retainDuplicates, Serdes.Integer(), Serdes.String(), 3L, true, Collections.emptyMap(), enableCaching);
        StateStore store = supplier.get();
        store.init(context, store);
        return store;
    }

    @After
    public void closeStore() {
        this.context.close();
        this.windowStore.close();
    }

    @Test
    public void shouldOnlyIterateOpenSegments() {
        this.windowStore = this.createWindowStore(this.context, false, true);
        long currentTime = 0L;
        this.context.setRecordContext(this.createRecordContext(currentTime));
        this.windowStore.put(1, "one");
        this.context.setRecordContext(this.createRecordContext(currentTime += 60000L));
        this.windowStore.put(1, "two");
        this.context.setRecordContext(this.createRecordContext(currentTime += 60000L));
        this.windowStore.put(1, "three");
        WindowStoreIterator iterator = this.windowStore.fetch(1, 0L, currentTime);
        this.context.setRecordContext(this.createRecordContext(currentTime += 60000L));
        this.windowStore.put(1, "four");
        Assert.assertEquals(new KeyValue<Long, String>(60000L, "two"), iterator.next());
        Assert.assertEquals(new KeyValue<Long, String>(120000L, "three"), iterator.next());
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    private ProcessorRecordContext createRecordContext(long time) {
        return new ProcessorRecordContext(time, 0L, 0, "topic");
    }

    @Test
    public void testPutAndFetch() throws IOException {
        this.windowStore = this.createWindowStore(this.context, false, true);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, startTime, this.context);
        Assert.assertEquals(Utils.mkList("zero"), this.toList(this.windowStore.fetch(0, startTime + 0L - 3L, startTime + 0L + 3L)));
        Assert.assertEquals(Utils.mkList("one"), this.toList(this.windowStore.fetch(1, startTime + 1L - 3L, startTime + 1L + 3L)));
        Assert.assertEquals(Utils.mkList("two"), this.toList(this.windowStore.fetch(2, startTime + 2L - 3L, startTime + 2L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(3, startTime + 3L - 3L, startTime + 3L + 3L)));
        Assert.assertEquals(Utils.mkList("four"), this.toList(this.windowStore.fetch(4, startTime + 4L - 3L, startTime + 4L + 3L)));
        Assert.assertEquals(Utils.mkList("five"), this.toList(this.windowStore.fetch(5, startTime + 5L - 3L, startTime + 5L + 3L)));
        this.putSecondBatch(this.windowStore, startTime, this.context);
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime - 2L - 3L, startTime - 2L + 3L)));
        Assert.assertEquals(Utils.mkList("two"), this.toList(this.windowStore.fetch(2, startTime - 1L - 3L, startTime - 1L + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1"), this.toList(this.windowStore.fetch(2, startTime - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1", "two+2"), this.toList(this.windowStore.fetch(2, startTime + 1L - 3L, startTime + 1L + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), this.toList(this.windowStore.fetch(2, startTime + 2L - 3L, startTime + 2L + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), this.toList(this.windowStore.fetch(2, startTime + 3L - 3L, startTime + 3L + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), this.toList(this.windowStore.fetch(2, startTime + 4L - 3L, startTime + 4L + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 5L - 3L, startTime + 5L + 3L)));
        Assert.assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 6L - 3L, startTime + 6L + 3L)));
        Assert.assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 7L - 3L, startTime + 7L + 3L)));
        Assert.assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 8L - 3L, startTime + 8L + 3L)));
        Assert.assertEquals(Utils.mkList("two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 9L - 3L, startTime + 9L + 3L)));
        Assert.assertEquals(Utils.mkList("two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 10L - 3L, startTime + 10L + 3L)));
        Assert.assertEquals(Utils.mkList("two+6"), this.toList(this.windowStore.fetch(2, startTime + 11L - 3L, startTime + 11L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + 12L - 3L, startTime + 12L + 3L)));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, startTime);
        Assert.assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
        Assert.assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
        Assert.assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
        Assert.assertNull(entriesByKey.get(3));
        Assert.assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
        Assert.assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void shouldGetAll() throws IOException {
        this.windowStore = this.createWindowStore(this.context, false, true);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, startTime, this.context);
        KeyValue<Windowed<Integer>, String> zero = RocksDBWindowStoreTest.windowedPair(0, "zero", startTime + 0L);
        KeyValue<Windowed<Integer>, String> one = RocksDBWindowStoreTest.windowedPair(1, "one", startTime + 1L);
        KeyValue<Windowed<Integer>, String> two = RocksDBWindowStoreTest.windowedPair(2, "two", startTime + 2L);
        KeyValue<Windowed<Integer>, String> four = RocksDBWindowStoreTest.windowedPair(4, "four", startTime + 4L);
        KeyValue<Windowed<Integer>, String> five = RocksDBWindowStoreTest.windowedPair(5, "five", startTime + 5L);
        Assert.assertEquals(Utils.mkList(zero, one, two, four, five), StreamsTestUtils.toList(this.windowStore.all()));
    }

    @Test
    public void shouldFetchAllInTimeRange() throws IOException {
        this.windowStore = this.createWindowStore(this.context, false, true);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, startTime, this.context);
        KeyValue<Windowed<Integer>, String> zero = RocksDBWindowStoreTest.windowedPair(0, "zero", startTime + 0L);
        KeyValue<Windowed<Integer>, String> one = RocksDBWindowStoreTest.windowedPair(1, "one", startTime + 1L);
        KeyValue<Windowed<Integer>, String> two = RocksDBWindowStoreTest.windowedPair(2, "two", startTime + 2L);
        KeyValue<Windowed<Integer>, String> four = RocksDBWindowStoreTest.windowedPair(4, "four", startTime + 4L);
        KeyValue<Windowed<Integer>, String> five = RocksDBWindowStoreTest.windowedPair(5, "five", startTime + 5L);
        Assert.assertEquals(Utils.mkList(one, two, four), StreamsTestUtils.toList(this.windowStore.fetchAll(startTime + 1L, startTime + 4L)));
        Assert.assertEquals(Utils.mkList(zero, one, two), StreamsTestUtils.toList(this.windowStore.fetchAll(startTime + 0L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList(one, two, four, five), StreamsTestUtils.toList(this.windowStore.fetchAll(startTime + 1L, startTime + 5L)));
    }

    @Test
    public void testFetchRange() throws IOException {
        this.windowStore = this.createWindowStore(this.context, false, true);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, startTime, this.context);
        KeyValue<Windowed<Integer>, String> zero = RocksDBWindowStoreTest.windowedPair(0, "zero", startTime + 0L);
        KeyValue<Windowed<Integer>, String> one = RocksDBWindowStoreTest.windowedPair(1, "one", startTime + 1L);
        KeyValue<Windowed<Integer>, String> two = RocksDBWindowStoreTest.windowedPair(2, "two", startTime + 2L);
        KeyValue<Windowed<Integer>, String> four = RocksDBWindowStoreTest.windowedPair(4, "four", startTime + 4L);
        KeyValue<Windowed<Integer>, String> five = RocksDBWindowStoreTest.windowedPair(5, "five", startTime + 5L);
        Assert.assertEquals(Utils.mkList(zero, one), StreamsTestUtils.toList(this.windowStore.fetch(0, 1, startTime + 0L - 3L, startTime + 0L + 3L)));
        Assert.assertEquals(Utils.mkList(one), StreamsTestUtils.toList(this.windowStore.fetch(1, 1, startTime + 0L - 3L, startTime + 0L + 3L)));
        Assert.assertEquals(Utils.mkList(one, two), StreamsTestUtils.toList(this.windowStore.fetch(1, 3, startTime + 0L - 3L, startTime + 0L + 3L)));
        Assert.assertEquals(Utils.mkList(zero, one, two), StreamsTestUtils.toList(this.windowStore.fetch(0, 5, startTime + 0L - 3L, startTime + 0L + 3L)));
        Assert.assertEquals(Utils.mkList(zero, one, two, four, five), StreamsTestUtils.toList(this.windowStore.fetch(0, 5, startTime + 0L - 3L, startTime + 0L + 3L + 5L)));
        Assert.assertEquals(Utils.mkList(two, four, five), StreamsTestUtils.toList(this.windowStore.fetch(0, 5, startTime + 2L, startTime + 0L + 3L + 5L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), StreamsTestUtils.toList(this.windowStore.fetch(4, 5, startTime + 2L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), StreamsTestUtils.toList(this.windowStore.fetch(0, 3, startTime + 3L, startTime + 3L + 5L)));
    }

    @Test
    public void testPutAndFetchBefore() throws IOException {
        this.windowStore = this.createWindowStore(this.context, false, true);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, startTime, this.context);
        Assert.assertEquals(Utils.mkList("zero"), this.toList(this.windowStore.fetch(0, startTime + 0L - 3L, startTime + 0L)));
        Assert.assertEquals(Utils.mkList("one"), this.toList(this.windowStore.fetch(1, startTime + 1L - 3L, startTime + 1L)));
        Assert.assertEquals(Utils.mkList("two"), this.toList(this.windowStore.fetch(2, startTime + 2L - 3L, startTime + 2L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(3, startTime + 3L - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList("four"), this.toList(this.windowStore.fetch(4, startTime + 4L - 3L, startTime + 4L)));
        Assert.assertEquals(Utils.mkList("five"), this.toList(this.windowStore.fetch(5, startTime + 5L - 3L, startTime + 5L)));
        this.putSecondBatch(this.windowStore, startTime, this.context);
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime - 1L - 3L, startTime - 1L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + 0L - 3L, startTime + 0L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + 1L - 3L, startTime + 1L)));
        Assert.assertEquals(Utils.mkList("two"), this.toList(this.windowStore.fetch(2, startTime + 2L - 3L, startTime + 2L)));
        Assert.assertEquals(Utils.mkList("two", "two+1"), this.toList(this.windowStore.fetch(2, startTime + 3L - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1", "two+2"), this.toList(this.windowStore.fetch(2, startTime + 4L - 3L, startTime + 4L)));
        Assert.assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), this.toList(this.windowStore.fetch(2, startTime + 5L - 3L, startTime + 5L)));
        Assert.assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), this.toList(this.windowStore.fetch(2, startTime + 6L - 3L, startTime + 6L)));
        Assert.assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), this.toList(this.windowStore.fetch(2, startTime + 7L - 3L, startTime + 7L)));
        Assert.assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 8L - 3L, startTime + 8L)));
        Assert.assertEquals(Utils.mkList("two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 9L - 3L, startTime + 9L)));
        Assert.assertEquals(Utils.mkList("two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 10L - 3L, startTime + 10L)));
        Assert.assertEquals(Utils.mkList("two+6"), this.toList(this.windowStore.fetch(2, startTime + 11L - 3L, startTime + 11L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + 12L - 3L, startTime + 12L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + 13L - 3L, startTime + 13L)));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, startTime);
        Assert.assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
        Assert.assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
        Assert.assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
        Assert.assertNull(entriesByKey.get(3));
        Assert.assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
        Assert.assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void testPutAndFetchAfter() throws IOException {
        this.windowStore = this.createWindowStore(this.context, false, true);
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, startTime, this.context);
        Assert.assertEquals(Utils.mkList("zero"), this.toList(this.windowStore.fetch(0, startTime + 0L, startTime + 0L + 3L)));
        Assert.assertEquals(Utils.mkList("one"), this.toList(this.windowStore.fetch(1, startTime + 1L, startTime + 1L + 3L)));
        Assert.assertEquals(Utils.mkList("two"), this.toList(this.windowStore.fetch(2, startTime + 2L, startTime + 2L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(3, startTime + 3L, startTime + 3L + 3L)));
        Assert.assertEquals(Utils.mkList("four"), this.toList(this.windowStore.fetch(4, startTime + 4L, startTime + 4L + 3L)));
        Assert.assertEquals(Utils.mkList("five"), this.toList(this.windowStore.fetch(5, startTime + 5L, startTime + 5L + 3L)));
        this.putSecondBatch(this.windowStore, startTime, this.context);
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime - 2L, startTime - 2L + 3L)));
        Assert.assertEquals(Utils.mkList("two"), this.toList(this.windowStore.fetch(2, startTime - 1L, startTime - 1L + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1"), this.toList(this.windowStore.fetch(2, startTime, startTime + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1", "two+2"), this.toList(this.windowStore.fetch(2, startTime + 1L, startTime + 1L + 3L)));
        Assert.assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), this.toList(this.windowStore.fetch(2, startTime + 2L, startTime + 2L + 3L)));
        Assert.assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), this.toList(this.windowStore.fetch(2, startTime + 3L, startTime + 3L + 3L)));
        Assert.assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), this.toList(this.windowStore.fetch(2, startTime + 4L, startTime + 4L + 3L)));
        Assert.assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 5L, startTime + 5L + 3L)));
        Assert.assertEquals(Utils.mkList("two+4", "two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 6L, startTime + 6L + 3L)));
        Assert.assertEquals(Utils.mkList("two+5", "two+6"), this.toList(this.windowStore.fetch(2, startTime + 7L, startTime + 7L + 3L)));
        Assert.assertEquals(Utils.mkList("two+6"), this.toList(this.windowStore.fetch(2, startTime + 8L, startTime + 8L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + 9L, startTime + 9L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + 10L, startTime + 10L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + 11L, startTime + 11L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + 12L, startTime + 12L + 3L)));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, startTime);
        Assert.assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
        Assert.assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
        Assert.assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
        Assert.assertNull(entriesByKey.get(3));
        Assert.assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
        Assert.assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void testPutSameKeyTimestamp() throws IOException {
        this.windowStore = this.createWindowStore(this.context, false, true);
        long startTime = 59996L;
        this.context.setRecordContext(this.createRecordContext(startTime));
        this.windowStore.put(0, "zero");
        Assert.assertEquals(Utils.mkList("zero"), this.toList(this.windowStore.fetch(0, startTime - 3L, startTime + 3L)));
        this.windowStore.put(0, "zero");
        this.windowStore.put(0, "zero+");
        this.windowStore.put(0, "zero++");
        Assert.assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), this.toList(this.windowStore.fetch(0, startTime - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), this.toList(this.windowStore.fetch(0, startTime + 1L - 3L, startTime + 1L + 3L)));
        Assert.assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), this.toList(this.windowStore.fetch(0, startTime + 2L - 3L, startTime + 2L + 3L)));
        Assert.assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), this.toList(this.windowStore.fetch(0, startTime + 3L - 3L, startTime + 3L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(0, startTime + 4L - 3L, startTime + 4L + 3L)));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, startTime);
        Assert.assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
    }

    @Test
    public void testRolling() throws IOException {
        this.windowStore = this.createWindowStore(this.context, false, true);
        Segments segments = new Segments("window", 120000L, 3);
        long startTime = 120000L;
        long incr = 30000L;
        this.context.setRecordContext(this.createRecordContext(startTime));
        this.windowStore.put(0, "zero");
        Assert.assertEquals(Utils.mkSet(segments.segmentName(2L)), this.segmentDirs(this.baseDir));
        this.context.setRecordContext(this.createRecordContext(startTime + incr));
        this.windowStore.put(1, "one");
        Assert.assertEquals(Utils.mkSet(segments.segmentName(2L)), this.segmentDirs(this.baseDir));
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 2L));
        this.windowStore.put(2, "two");
        Assert.assertEquals(Utils.mkSet(segments.segmentName(2L), segments.segmentName(3L)), this.segmentDirs(this.baseDir));
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 4L));
        this.windowStore.put(4, "four");
        Assert.assertEquals(Utils.mkSet(segments.segmentName(2L), segments.segmentName(3L), segments.segmentName(4L)), this.segmentDirs(this.baseDir));
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 5L));
        this.windowStore.put(5, "five");
        Assert.assertEquals(Utils.mkSet(segments.segmentName(2L), segments.segmentName(3L), segments.segmentName(4L)), this.segmentDirs(this.baseDir));
        Assert.assertEquals(Utils.mkList("zero"), this.toList(this.windowStore.fetch(0, startTime - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList("one"), this.toList(this.windowStore.fetch(1, startTime + incr - 3L, startTime + incr + 3L)));
        Assert.assertEquals(Utils.mkList("two"), this.toList(this.windowStore.fetch(2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
        Assert.assertEquals(Utils.mkList("four"), this.toList(this.windowStore.fetch(4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
        Assert.assertEquals(Utils.mkList("five"), this.toList(this.windowStore.fetch(5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 6L));
        this.windowStore.put(6, "six");
        Assert.assertEquals(Utils.mkSet(segments.segmentName(3L), segments.segmentName(4L), segments.segmentName(5L)), this.segmentDirs(this.baseDir));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(0, startTime - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(1, startTime + incr - 3L, startTime + incr + 3L)));
        Assert.assertEquals(Utils.mkList("two"), this.toList(this.windowStore.fetch(2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
        Assert.assertEquals(Utils.mkList("four"), this.toList(this.windowStore.fetch(4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
        Assert.assertEquals(Utils.mkList("five"), this.toList(this.windowStore.fetch(5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
        Assert.assertEquals(Utils.mkList("six"), this.toList(this.windowStore.fetch(6, startTime + incr * 6L - 3L, startTime + incr * 6L + 3L)));
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 7L));
        this.windowStore.put(7, "seven");
        Assert.assertEquals(Utils.mkSet(segments.segmentName(3L), segments.segmentName(4L), segments.segmentName(5L)), this.segmentDirs(this.baseDir));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(0, startTime - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(1, startTime + incr - 3L, startTime + incr + 3L)));
        Assert.assertEquals(Utils.mkList("two"), this.toList(this.windowStore.fetch(2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
        Assert.assertEquals(Utils.mkList("four"), this.toList(this.windowStore.fetch(4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
        Assert.assertEquals(Utils.mkList("five"), this.toList(this.windowStore.fetch(5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
        Assert.assertEquals(Utils.mkList("six"), this.toList(this.windowStore.fetch(6, startTime + incr * 6L - 3L, startTime + incr * 6L + 3L)));
        Assert.assertEquals(Utils.mkList("seven"), this.toList(this.windowStore.fetch(7, startTime + incr * 7L - 3L, startTime + incr * 7L + 3L)));
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 8L));
        this.windowStore.put(8, "eight");
        Assert.assertEquals(Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)), this.segmentDirs(this.baseDir));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(0, startTime - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(1, startTime + incr - 3L, startTime + incr + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
        Assert.assertEquals(Utils.mkList("four"), this.toList(this.windowStore.fetch(4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
        Assert.assertEquals(Utils.mkList("five"), this.toList(this.windowStore.fetch(5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
        Assert.assertEquals(Utils.mkList("six"), this.toList(this.windowStore.fetch(6, startTime + incr * 6L - 3L, startTime + incr * 6L + 3L)));
        Assert.assertEquals(Utils.mkList("seven"), this.toList(this.windowStore.fetch(7, startTime + incr * 7L - 3L, startTime + incr * 7L + 3L)));
        Assert.assertEquals(Utils.mkList("eight"), this.toList(this.windowStore.fetch(8, startTime + incr * 8L - 3L, startTime + incr * 8L + 3L)));
        this.windowStore.flush();
        Assert.assertEquals(Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)), this.segmentDirs(this.baseDir));
    }

    @Test
    public void testRestore() throws IOException {
        long startTime = 120000L;
        long incr = 30000L;
        this.windowStore = this.createWindowStore(this.context, false, true);
        this.context.setRecordContext(this.createRecordContext(startTime));
        this.windowStore.put(0, "zero");
        this.context.setRecordContext(this.createRecordContext(startTime + incr));
        this.windowStore.put(1, "one");
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 2L));
        this.windowStore.put(2, "two");
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 3L));
        this.windowStore.put(3, "three");
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 4L));
        this.windowStore.put(4, "four");
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 5L));
        this.windowStore.put(5, "five");
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 6L));
        this.windowStore.put(6, "six");
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 7L));
        this.windowStore.put(7, "seven");
        this.context.setRecordContext(this.createRecordContext(startTime + incr * 8L));
        this.windowStore.put(8, "eight");
        this.windowStore.flush();
        this.windowStore.close();
        Utils.delete(this.baseDir);
        this.windowStore = this.createWindowStore(this.context, false, true);
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(0, startTime - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(1, startTime + incr - 3L, startTime + incr + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(6, startTime + incr * 6L - 3L, startTime + incr * 6L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(7, startTime + incr * 7L - 3L, startTime + incr * 7L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(8, startTime + incr * 8L - 3L, startTime + incr * 8L + 3L)));
        this.context.restore("window", this.changeLog);
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(0, startTime - 3L, startTime + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(1, startTime + incr - 3L, startTime + incr + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(2, startTime + incr * 2L - 3L, startTime + incr * 2L + 3L)));
        Assert.assertEquals(Utils.mkList(new Object[0]), this.toList(this.windowStore.fetch(3, startTime + incr * 3L - 3L, startTime + incr * 3L + 3L)));
        Assert.assertEquals(Utils.mkList("four"), this.toList(this.windowStore.fetch(4, startTime + incr * 4L - 3L, startTime + incr * 4L + 3L)));
        Assert.assertEquals(Utils.mkList("five"), this.toList(this.windowStore.fetch(5, startTime + incr * 5L - 3L, startTime + incr * 5L + 3L)));
        Assert.assertEquals(Utils.mkList("six"), this.toList(this.windowStore.fetch(6, startTime + incr * 6L - 3L, startTime + incr * 6L + 3L)));
        Assert.assertEquals(Utils.mkList("seven"), this.toList(this.windowStore.fetch(7, startTime + incr * 7L - 3L, startTime + incr * 7L + 3L)));
        Assert.assertEquals(Utils.mkList("eight"), this.toList(this.windowStore.fetch(8, startTime + incr * 8L - 3L, startTime + incr * 8L + 3L)));
        this.windowStore.flush();
        Assert.assertEquals(Utils.mkSet(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)), this.segmentDirs(this.baseDir));
    }

    @Test
    public void testSegmentMaintenance() throws IOException {
        this.windowStore = this.createWindowStore(this.context, false, true);
        this.context.setTime(0L);
        this.context.setRecordContext(this.createRecordContext(0L));
        this.windowStore.put(0, "v");
        Assert.assertEquals(Utils.mkSet(this.segments.segmentName(0L)), this.segmentDirs(this.baseDir));
        this.context.setRecordContext(this.createRecordContext(59999L));
        this.windowStore.put(0, "v");
        this.windowStore.put(0, "v");
        Assert.assertEquals(Utils.mkSet(this.segments.segmentName(0L)), this.segmentDirs(this.baseDir));
        this.context.setRecordContext(this.createRecordContext(60000L));
        this.windowStore.put(0, "v");
        Assert.assertEquals(Utils.mkSet(this.segments.segmentName(0L), this.segments.segmentName(1L)), this.segmentDirs(this.baseDir));
        WindowStoreIterator iter = this.windowStore.fetch(0, 0L, 240000L);
        int fetchedCount = 0;
        while (iter.hasNext()) {
            iter.next();
            ++fetchedCount;
        }
        Assert.assertEquals((long)4L, (long)fetchedCount);
        Assert.assertEquals(Utils.mkSet(this.segments.segmentName(0L), this.segments.segmentName(1L)), this.segmentDirs(this.baseDir));
        this.context.setRecordContext(this.createRecordContext(180000L));
        this.windowStore.put(0, "v");
        iter = this.windowStore.fetch(0, 0L, 240000L);
        fetchedCount = 0;
        while (iter.hasNext()) {
            iter.next();
            ++fetchedCount;
        }
        Assert.assertEquals((long)2L, (long)fetchedCount);
        Assert.assertEquals(Utils.mkSet(this.segments.segmentName(1L), this.segments.segmentName(3L)), this.segmentDirs(this.baseDir));
        this.context.setRecordContext(this.createRecordContext(300000L));
        this.windowStore.put(0, "v");
        iter = this.windowStore.fetch(0, 240000L, 1000000L);
        fetchedCount = 0;
        while (iter.hasNext()) {
            iter.next();
            ++fetchedCount;
        }
        Assert.assertEquals((long)1L, (long)fetchedCount);
        Assert.assertEquals(Utils.mkSet(this.segments.segmentName(3L), this.segments.segmentName(5L)), this.segmentDirs(this.baseDir));
    }

    @Test
    public void testInitialLoading() throws IOException {
        File storeDir = new File(this.baseDir, "window");
        this.windowStore = this.createWindowStore(this.context, false, true);
        new File(storeDir, this.segments.segmentName(0L)).mkdir();
        new File(storeDir, this.segments.segmentName(1L)).mkdir();
        new File(storeDir, this.segments.segmentName(2L)).mkdir();
        new File(storeDir, this.segments.segmentName(3L)).mkdir();
        new File(storeDir, this.segments.segmentName(4L)).mkdir();
        new File(storeDir, this.segments.segmentName(5L)).mkdir();
        new File(storeDir, this.segments.segmentName(6L)).mkdir();
        this.windowStore.close();
        this.windowStore = this.createWindowStore(this.context, false, true);
        Assert.assertEquals(Utils.mkSet(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)), this.segmentDirs(this.baseDir));
        try (WindowStoreIterator iter = this.windowStore.fetch(0, 0L, 1000000L);){
            while (iter.hasNext()) {
                iter.next();
            }
        }
        Assert.assertEquals(Utils.mkSet(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)), this.segmentDirs(this.baseDir));
    }

    @Test
    public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() {
        this.windowStore = this.createWindowStore(this.context, false, true);
        this.context.setRecordContext(this.createRecordContext(0L));
        this.windowStore.put(1, "one", 1L);
        this.windowStore.put(1, "two", 2L);
        this.windowStore.put(1, "three", 3L);
        WindowStoreIterator iterator = this.windowStore.fetch(1, 1L, 3L);
        Assert.assertTrue((boolean)iterator.hasNext());
        this.windowStore.close();
        try {
            iterator.hasNext();
            Assert.fail((String)"should have thrown InvalidStateStoreException on closed store");
        }
        catch (InvalidStateStoreException invalidStateStoreException) {
            // empty catch block
        }
        try {
            iterator.next();
            Assert.fail((String)"should have thrown InvalidStateStoreException on closed store");
        }
        catch (InvalidStateStoreException invalidStateStoreException) {
            // empty catch block
        }
    }

    @Test
    public void shouldFetchAndIterateOverExactKeys() {
        long windowSize = 0x7A00000000000000L;
        long retentionPeriod = 0x7A00000000000000L;
        RocksDBWindowStoreSupplier<String, String> supplier = new RocksDBWindowStoreSupplier<String, String>("window", 0x7A00000000000000L, 2, true, Serdes.String(), Serdes.String(), 0x7A00000000000000L, true, Collections.emptyMap(), false);
        this.windowStore = supplier.get();
        this.windowStore.init(this.context, this.windowStore);
        this.windowStore.put("a", "0001", 0L);
        this.windowStore.put("aa", "0002", 0L);
        this.windowStore.put("a", "0003", 1L);
        this.windowStore.put("aa", "0004", 1L);
        this.windowStore.put("a", "0005", 0x79FFFFFFFFFFFFFFL);
        List<String> expected = Utils.mkList("0001", "0003", "0005");
        MatcherAssert.assertThat(this.toList(this.windowStore.fetch("a", 0L, Long.MAX_VALUE)), (Matcher)CoreMatchers.equalTo(expected));
        List list = StreamsTestUtils.toList(this.windowStore.fetch("a", "a", 0L, Long.MAX_VALUE));
        MatcherAssert.assertThat(list, (Matcher)CoreMatchers.equalTo(Utils.mkList(RocksDBWindowStoreTest.windowedPair("a", "0001", 0L, 0x7A00000000000000L), RocksDBWindowStoreTest.windowedPair("a", "0003", 1L, 0x7A00000000000000L), RocksDBWindowStoreTest.windowedPair("a", "0005", 0x79FFFFFFFFFFFFFFL, 0x7A00000000000000L))));
        list = StreamsTestUtils.toList(this.windowStore.fetch("aa", "aa", 0L, Long.MAX_VALUE));
        MatcherAssert.assertThat(list, (Matcher)CoreMatchers.equalTo(Utils.mkList(RocksDBWindowStoreTest.windowedPair("aa", "0002", 0L, 0x7A00000000000000L), RocksDBWindowStoreTest.windowedPair("aa", "0004", 1L, 0x7A00000000000000L))));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        this.windowStore = this.createWindowStore(this.context, false, true);
        this.windowStore.put(null, "anyValue");
    }

    @Test
    public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
        this.windowStore = this.createWindowStore(this.context, false, true);
        this.windowStore.put(1, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnGetNullKey() {
        this.windowStore = this.createWindowStore(this.context, false, true);
        this.windowStore.fetch(null, 1L, 2L);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
        this.windowStore = this.createWindowStore(this.context, false, true);
        this.windowStore.fetch(null, 2, 1L, 2L);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
        this.windowStore = this.createWindowStore(this.context, false, true);
        this.windowStore.fetch(1, null, 1L, 2L);
    }

    @Test
    public void shouldFetchAndIterateOverExactBinaryKeys() {
        RocksDBWindowStoreSupplier<Bytes, String> supplier = new RocksDBWindowStoreSupplier<Bytes, String>("window", 60000L, 2, true, Serdes.Bytes(), Serdes.String(), 60000L, true, Collections.emptyMap(), false);
        this.windowStore = supplier.get();
        this.windowStore.init(this.context, this.windowStore);
        Bytes key1 = Bytes.wrap(new byte[]{0});
        Bytes key2 = Bytes.wrap(new byte[]{0, 0});
        Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0});
        this.windowStore.put(key1, "1", 0L);
        this.windowStore.put(key2, "2", 0L);
        this.windowStore.put(key3, "3", 0L);
        this.windowStore.put(key1, "4", 1L);
        this.windowStore.put(key2, "5", 1L);
        this.windowStore.put(key3, "6", 59999L);
        this.windowStore.put(key1, "7", 59999L);
        this.windowStore.put(key2, "8", 59999L);
        this.windowStore.put(key3, "9", 59999L);
        List<String> expectedKey1 = Utils.mkList("1", "4", "7");
        MatcherAssert.assertThat(this.toList(this.windowStore.fetch(key1, 0L, Long.MAX_VALUE)), (Matcher)CoreMatchers.equalTo(expectedKey1));
        List<String> expectedKey2 = Utils.mkList("2", "5", "8");
        MatcherAssert.assertThat(this.toList(this.windowStore.fetch(key2, 0L, Long.MAX_VALUE)), (Matcher)CoreMatchers.equalTo(expectedKey2));
        List<String> expectedKey3 = Utils.mkList("3", "6", "9");
        MatcherAssert.assertThat(this.toList(this.windowStore.fetch(key3, 0L, Long.MAX_VALUE)), (Matcher)CoreMatchers.equalTo(expectedKey3));
    }

    private void putFirstBatch(WindowStore<Integer, String> store, long startTime, MockProcessorContext context) {
        context.setRecordContext(this.createRecordContext(startTime));
        store.put(0, "zero");
        context.setRecordContext(this.createRecordContext(startTime + 1L));
        store.put(1, "one");
        context.setRecordContext(this.createRecordContext(startTime + 2L));
        store.put(2, "two");
        context.setRecordContext(this.createRecordContext(startTime + 4L));
        store.put(4, "four");
        context.setRecordContext(this.createRecordContext(startTime + 5L));
        store.put(5, "five");
    }

    private void putSecondBatch(WindowStore<Integer, String> store, long startTime, MockProcessorContext context) {
        context.setRecordContext(this.createRecordContext(startTime + 3L));
        store.put(2, "two+1");
        context.setRecordContext(this.createRecordContext(startTime + 4L));
        store.put(2, "two+2");
        context.setRecordContext(this.createRecordContext(startTime + 5L));
        store.put(2, "two+3");
        context.setRecordContext(this.createRecordContext(startTime + 6L));
        store.put(2, "two+4");
        context.setRecordContext(this.createRecordContext(startTime + 7L));
        store.put(2, "two+5");
        context.setRecordContext(this.createRecordContext(startTime + 8L));
        store.put(2, "two+6");
    }

    private <E> List<E> toList(WindowStoreIterator<E> iterator) {
        ArrayList list = new ArrayList();
        while (iterator.hasNext()) {
            list.add(((KeyValue)iterator.next()).value);
        }
        return list;
    }

    private Set<String> segmentDirs(File baseDir) {
        File windowDir = new File(baseDir, "window");
        return new HashSet<String>(Arrays.asList(windowDir.list()));
    }

    private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
        HashMap<Integer, Set<String>> entriesByKey = new HashMap<Integer, Set<String>>();
        for (KeyValue<byte[], byte[]> entry : changeLog) {
            long timestamp = WindowStoreUtils.timestampFromBinaryKey((byte[])entry.key);
            Integer key = WindowStoreUtils.keyFromBinaryKey((byte[])entry.key, this.serdes);
            String value = entry.value == null ? null : this.serdes.valueFrom((byte[])entry.value);
            Set<String> entries = entriesByKey.get(key);
            if (entries == null) {
                entries = new HashSet<String>();
                entriesByKey.put(key, entries);
            }
            entries.add(value + "@" + (timestamp - startTime));
        }
        return entriesByKey;
    }

    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
        return RocksDBWindowStoreTest.windowedPair(key, value, timestamp, 3L);
    }

    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
        return KeyValue.pair(new Windowed<K>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)), value);
    }
}

