/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.CachingSessionStore;
import org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.RocksDBSessionStore;
import org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest;
import org.apache.kafka.streams.state.internals.Segments;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedSessionStoreIterator;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CachingSessionStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 600;
    private MockProcessorContext context;
    private RocksDBSegmentedBytesStore underlying;
    private CachingSessionStore<String, String> cachingStore;
    private ThreadCache cache;
    private static final Long DEFAULT_TIMESTAMP = 10L;
    private final Bytes keyA = Bytes.wrap("a".getBytes());
    private final Bytes keyAA = Bytes.wrap("aa".getBytes());
    private final Bytes keyB = Bytes.wrap("b".getBytes());

    @Before
    public void setUp() {
        SessionKeySchema schema = new SessionKeySchema();
        schema.init("topic");
        int retention = 60000;
        int numSegments = 3;
        this.underlying = new RocksDBSegmentedBytesStore("test", 60000L, 3, schema);
        RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<Bytes, byte[]>(this.underlying, Serdes.Bytes(), Serdes.ByteArray());
        this.cachingStore = new CachingSessionStore<String, String>(sessionStore, Serdes.String(), Serdes.String(), Segments.segmentInterval(60000L, 3));
        this.cache = new ThreadCache(new LogContext("testCache "), 600L, new MockStreamsMetrics(new Metrics()));
        this.context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector)null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0L, 0, "topic"));
        this.cachingStore.init(this.context, this.cachingStore);
    }

    @After
    public void close() {
        this.context.close();
        this.cachingStore.close();
    }

    private Bytes bytesKey(String key) {
        return Bytes.wrap(key.getBytes());
    }

    @Test
    public void shouldPutFetchFromCache() {
        this.cachingStore.put(new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed<Bytes>(this.keyAA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed<Bytes>(this.keyB, new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals((long)3L, (long)this.cache.size());
        KeyValueIterator<Windowed<Bytes>, byte[]> a = this.cachingStore.findSessions(this.keyA, 0L, 0L);
        KeyValueIterator<Windowed<Bytes>, byte[]> b = this.cachingStore.findSessions(this.keyB, 0L, 0L);
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue)a.next(), new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue)b.next(), new Windowed<Bytes>(this.keyB, new SessionWindow(0L, 0L)), "1");
        Assert.assertFalse((boolean)a.hasNext());
        Assert.assertFalse((boolean)b.hasNext());
    }

    @Test
    public void shouldPutFetchAllKeysFromCache() {
        this.cachingStore.put(new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed<Bytes>(this.keyAA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed<Bytes>(this.keyB, new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals((long)3L, (long)this.cache.size());
        KeyValueIterator<Windowed<Bytes>, byte[]> all = this.cachingStore.findSessions(this.keyA, this.keyB, 0L, 0L);
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue)all.next(), new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue)all.next(), new Windowed<Bytes>(this.keyAA, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue)all.next(), new Windowed<Bytes>(this.keyB, new SessionWindow(0L, 0L)), "1");
        Assert.assertFalse((boolean)all.hasNext());
    }

    @Test
    public void shouldPutFetchRangeFromCache() {
        this.cachingStore.put(new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed<Bytes>(this.keyAA, new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed<Bytes>(this.keyB, new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals((long)3L, (long)this.cache.size());
        KeyValueIterator<Windowed<Bytes>, byte[]> some = this.cachingStore.findSessions(this.keyAA, this.keyB, 0L, 0L);
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue)some.next(), new Windowed<Bytes>(this.keyAA, new SessionWindow(0L, 0L)), "1");
        StreamsTestUtils.verifyWindowedKeyValue((KeyValue)some.next(), new Windowed<Bytes>(this.keyB, new SessionWindow(0L, 0L)), "1");
        Assert.assertFalse((boolean)some.hasNext());
    }

    @Test
    public void shouldFetchAllSessionsWithSameRecordKey() {
        List expected = Arrays.asList(KeyValue.pair(new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes()), KeyValue.pair(new Windowed<Bytes>(this.keyA, new SessionWindow(10L, 10L)), "2".getBytes()), KeyValue.pair(new Windowed<Bytes>(this.keyA, new SessionWindow(100L, 100L)), "3".getBytes()), KeyValue.pair(new Windowed<Bytes>(this.keyA, new SessionWindow(1000L, 1000L)), "4".getBytes()));
        for (KeyValue keyValue : expected) {
            this.cachingStore.put((Windowed<Bytes>)((Windowed)keyValue.key), (byte[])keyValue.value);
        }
        this.cachingStore.put(new Windowed<Bytes>(this.keyAA, new SessionWindow(0L, 0L)), "5".getBytes());
        List results = RocksDBSessionStoreTest.toList(this.cachingStore.fetch(this.keyA));
        StreamsTestUtils.verifyKeyValueList(expected, results);
    }

    @Test
    public void shouldFlushItemsToStoreOnEviction() {
        StateSerdes<Bytes, byte[]> serdes = new StateSerdes<Bytes, byte[]>("topic", Serdes.Bytes(), Serdes.ByteArray());
        List<KeyValue<Windowed<Bytes>, byte[]>> added = this.addSessionsUntilOverflow("a", "b", "c", "d");
        Assert.assertEquals((long)(added.size() - 1), (long)this.cache.size());
        WrappedSessionStoreIterator<Bytes, byte[]> iterator = WrappedSessionStoreIterator.bytesIterator(this.underlying.fetch((Bytes)((Windowed)added.get((int)0).key).key(), 0L, 0L), serdes);
        KeyValue next = (KeyValue)iterator.next();
        Assert.assertEquals(added.get((int)0).key, next.key);
        Assert.assertArrayEquals((byte[])((byte[])added.get((int)0).value), (byte[])((byte[])next.value));
    }

    @Test
    public void shouldQueryItemsInCacheAndStore() {
        List added = this.addSessionsUntilOverflow("a");
        KeyValueIterator<Windowed<Bytes>, byte[]> iterator = this.cachingStore.findSessions(Bytes.wrap("a".getBytes()), 0L, (long)(added.size() * 10));
        List actual = RocksDBSessionStoreTest.toList(iterator);
        StreamsTestUtils.verifyKeyValueList(added, actual);
    }

    @Test
    public void shouldRemove() {
        Windowed<Bytes> a = new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L));
        Windowed<Bytes> b = new Windowed<Bytes>(this.keyB, new SessionWindow(0L, 0L));
        this.cachingStore.put(a, "2".getBytes());
        this.cachingStore.put(b, "2".getBytes());
        this.cachingStore.flush();
        this.cachingStore.remove(a);
        this.cachingStore.flush();
        KeyValueIterator<Windowed<Bytes>, byte[]> rangeIter = this.cachingStore.findSessions(this.keyA, 0L, 0L);
        Assert.assertFalse((boolean)rangeIter.hasNext());
    }

    @Test
    public void shouldFetchCorrectlyAcrossSegments() {
        Windowed<Bytes> a1 = new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L));
        Windowed<Bytes> a2 = new Windowed<Bytes>(this.keyA, new SessionWindow(60000L, 60000L));
        Windowed<Bytes> a3 = new Windowed<Bytes>(this.keyA, new SessionWindow(120000L, 120000L));
        this.cachingStore.put(a1, "1".getBytes());
        this.cachingStore.put(a2, "2".getBytes());
        this.cachingStore.put(a3, "3".getBytes());
        this.cachingStore.flush();
        KeyValueIterator<Windowed<Bytes>, byte[]> results = this.cachingStore.findSessions(this.keyA, 0L, 120000L);
        Assert.assertEquals(a1, ((KeyValue)results.next()).key);
        Assert.assertEquals(a2, ((KeyValue)results.next()).key);
        Assert.assertEquals(a3, ((KeyValue)results.next()).key);
        Assert.assertFalse((boolean)results.hasNext());
    }

    @Test
    public void shouldFetchRangeCorrectlyAcrossSegments() {
        Windowed<Bytes> a1 = new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L));
        Windowed<Bytes> aa1 = new Windowed<Bytes>(this.keyAA, new SessionWindow(0L, 0L));
        Windowed<Bytes> a2 = new Windowed<Bytes>(this.keyA, new SessionWindow(60000L, 60000L));
        Windowed<Bytes> a3 = new Windowed<Bytes>(this.keyA, new SessionWindow(120000L, 120000L));
        Windowed<Bytes> aa3 = new Windowed<Bytes>(this.keyAA, new SessionWindow(120000L, 120000L));
        this.cachingStore.put(a1, "1".getBytes());
        this.cachingStore.put(aa1, "1".getBytes());
        this.cachingStore.put(a2, "2".getBytes());
        this.cachingStore.put(a3, "3".getBytes());
        this.cachingStore.put(aa3, "3".getBytes());
        this.cachingStore.flush();
        KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = this.cachingStore.findSessions(this.keyA, this.keyAA, 0L, 120000L);
        Assert.assertEquals(a1, ((KeyValue)rangeResults.next()).key);
        Assert.assertEquals(aa1, ((KeyValue)rangeResults.next()).key);
        Assert.assertEquals(a2, ((KeyValue)rangeResults.next()).key);
        Assert.assertEquals(a3, ((KeyValue)rangeResults.next()).key);
        Assert.assertEquals(aa3, ((KeyValue)rangeResults.next()).key);
        Assert.assertFalse((boolean)rangeResults.hasNext());
    }

    @Test
    public void shouldForwardChangedValuesDuringFlush() {
        Windowed<Bytes> a = new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L));
        Windowed<String> aDeserialized = new Windowed<String>("a", new SessionWindow(0L, 0L));
        final ArrayList flushed = new ArrayList();
        this.cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>(){

            @Override
            public void apply(Windowed<String> key, String newValue, String oldValue) {
                flushed.add(KeyValue.pair(key, new Change<String>(newValue, oldValue)));
            }
        }, true);
        this.cachingStore.put(a, "1".getBytes());
        this.cachingStore.flush();
        this.cachingStore.put(a, "2".getBytes());
        this.cachingStore.flush();
        this.cachingStore.remove(a);
        this.cachingStore.flush();
        Assert.assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<Object>("1", null)), KeyValue.pair(aDeserialized, new Change<String>("2", "1")), KeyValue.pair(aDeserialized, new Change<String>(null, "2"))));
    }

    @Test
    public void shouldForwardChangedValuesDuringFlushWhenSendOldValuesDisabledNewRecordIsNull() {
        Windowed<Bytes> a = new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L));
        Windowed<String> aDeserialized = new Windowed<String>("a", new SessionWindow(0L, 0L));
        final ArrayList flushed = new ArrayList();
        this.cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>(){

            @Override
            public void apply(Windowed<String> key, String newValue, String oldValue) {
                flushed.add(KeyValue.pair(key, new Change<String>(newValue, oldValue)));
            }
        }, false);
        this.cachingStore.put(a, "1".getBytes());
        this.cachingStore.flush();
        this.cachingStore.put(a, "2".getBytes());
        this.cachingStore.flush();
        this.cachingStore.remove(a);
        this.cachingStore.flush();
        Assert.assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<Object>("1", null)), KeyValue.pair(aDeserialized, new Change<Object>("2", null)), KeyValue.pair(aDeserialized, new Change<String>(null, "2"))));
    }

    @Test
    public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
        Windowed<Bytes> a = new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L));
        Windowed<String> aDeserialized = new Windowed<String>("a", new SessionWindow(0L, 0L));
        final ArrayList flushed = new ArrayList();
        this.cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>(){

            @Override
            public void apply(Windowed<String> key, String newValue, String oldValue) {
                flushed.add(KeyValue.pair(key, new Change<String>(newValue, oldValue)));
            }
        }, false);
        this.cachingStore.put(a, "1".getBytes());
        this.cachingStore.flush();
        this.cachingStore.put(a, "2".getBytes());
        this.cachingStore.flush();
        Assert.assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<Object>("1", null)), KeyValue.pair(aDeserialized, new Change<Object>("2", null))));
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() {
        Windowed<Bytes> a1 = new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L));
        this.cachingStore.put(a1, "1".getBytes());
        Assert.assertEquals((long)1L, (long)this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals((long)0L, (long)this.cache.size());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.fetch(this.keyA);
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.findSessions(this.keyA, 0L, Long.MAX_VALUE);
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToRemoveFromClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.remove(new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L)));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowIfTryingToPutIntoClosedCachingStore() {
        this.cachingStore.close();
        this.cachingStore.put(new Windowed<Bytes>(this.keyA, new SessionWindow(0L, 0L)), "1".getBytes());
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() {
        this.cachingStore.findSessions((Bytes)null, 1L, 2L);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() {
        this.cachingStore.findSessions(null, this.keyA, 1L, 2L);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() {
        this.cachingStore.findSessions(this.keyA, null, 1L, 2L);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnFetchNullFromKey() {
        this.cachingStore.fetch(null, this.keyA);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnFetchNullToKey() {
        this.cachingStore.fetch(this.keyA, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnFetchNullKey() {
        this.cachingStore.fetch((Bytes)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRemoveNullKey() {
        this.cachingStore.remove((Windowed<Bytes>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        this.cachingStore.put(null, "1".getBytes());
    }

    private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(String ... sessionIds) {
        Random random = new Random();
        ArrayList<KeyValue<Windowed<Bytes>, byte[]>> results = new ArrayList<KeyValue<Windowed<Bytes>, byte[]>>();
        while (this.cache.size() == (long)results.size()) {
            String sessionId = sessionIds[random.nextInt(sessionIds.length)];
            this.addSingleSession(sessionId, results);
        }
        return results;
    }

    private void addSingleSession(String sessionId, List<KeyValue<Windowed<Bytes>, byte[]>> allSessions) {
        int timestamp = allSessions.size() * 10;
        Windowed<Bytes> key = new Windowed<Bytes>(Bytes.wrap(sessionId.getBytes()), new SessionWindow(timestamp, timestamp));
        byte[] value = "1".getBytes();
        this.cachingStore.put(key, value);
        allSessions.add(KeyValue.pair(key, value));
    }
}

