/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.MergedSortedCacheKeyValueBytesStoreIterator;
import org.apache.kafka.streams.state.internals.PeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MergedSortedCacheKeyValueBytesStoreIteratorTest {
    private final String namespace = "0.0-one";
    private final StateSerdes<byte[], byte[]> serdes = new StateSerdes<byte[], byte[]>("dummy", Serdes.ByteArray(), Serdes.ByteArray());
    private KeyValueStore<Bytes, byte[]> store;
    private ThreadCache cache;

    @Before
    public void setUp() throws Exception {
        this.store = new InMemoryKeyValueStore<Bytes, byte[]>("0.0-one", Serdes.Bytes(), Serdes.ByteArray());
        this.cache = new ThreadCache(new LogContext("testCache "), 10000L, new MockStreamsMetrics(new Metrics()));
    }

    @Test
    public void shouldIterateOverRange() throws Exception {
        byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
        for (int i = 0; i < bytes.length; i += 2) {
            this.store.put(Bytes.wrap(bytes[i]), bytes[i]);
            this.cache.put("0.0-one", Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
        }
        Bytes from = Bytes.wrap(new byte[]{2});
        Bytes to = Bytes.wrap(new byte[]{9});
        DelegatingPeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<Bytes, byte[]>("store", this.store.range(from, to));
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.range("0.0-one", from, to);
        MergedSortedCacheKeyValueBytesStoreIterator iterator = new MergedSortedCacheKeyValueBytesStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, (KeyValueIterator<Bytes, byte[]>)storeIterator);
        byte[][] values = new byte[8][];
        int index = 0;
        int bytesIndex = 2;
        while (iterator.hasNext()) {
            byte[] value = (byte[])((KeyValue)iterator.next()).value;
            values[index++] = value;
            Assert.assertArrayEquals((byte[])bytes[bytesIndex++], (byte[])value);
        }
        iterator.close();
    }

    @Test
    public void shouldSkipLargerDeletedCacheValue() throws Exception {
        byte[][] bytes = new byte[][]{{0}, {1}};
        this.store.put(Bytes.wrap(bytes[0]), bytes[0]);
        this.cache.put("0.0-one", Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
        MergedSortedCacheKeyValueBytesStoreIterator iterator = this.createIterator();
        Assert.assertArrayEquals((byte[])bytes[0], (byte[])((Bytes)((KeyValue)iterator.next()).key).get());
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldSkipSmallerDeletedCachedValue() throws Exception {
        byte[][] bytes = new byte[][]{{0}, {1}};
        this.cache.put("0.0-one", Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
        this.store.put(Bytes.wrap(bytes[1]), bytes[1]);
        MergedSortedCacheKeyValueBytesStoreIterator iterator = this.createIterator();
        Assert.assertArrayEquals((byte[])bytes[1], (byte[])((Bytes)((KeyValue)iterator.next()).key).get());
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldIgnoreIfDeletedInCacheButExistsInStore() throws Exception {
        byte[][] bytes = new byte[][]{{0}};
        this.cache.put("0.0-one", Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
        this.store.put(Bytes.wrap(bytes[0]), bytes[0]);
        MergedSortedCacheKeyValueBytesStoreIterator iterator = this.createIterator();
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldNotHaveNextIfAllCachedItemsDeleted() throws Exception {
        byte[][] bytes;
        for (byte[] aByte : bytes = new byte[][]{{0}, {1}, {2}}) {
            Bytes aBytes = Bytes.wrap(aByte);
            this.store.put(aBytes, aByte);
            this.cache.put("0.0-one", aBytes, new LRUCacheEntry(null));
        }
        Assert.assertFalse((boolean)this.createIterator().hasNext());
    }

    @Test
    public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() throws Exception {
        byte[][] bytes;
        for (byte[] aByte : bytes = new byte[][]{{0}, {1}, {2}}) {
            this.cache.put("0.0-one", Bytes.wrap(aByte), new LRUCacheEntry(null));
        }
        Assert.assertFalse((boolean)this.createIterator().hasNext());
    }

    @Test
    public void shouldSkipAllDeletedFromCache() throws Exception {
        byte[][] bytes;
        for (byte[] aByte : bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}}) {
            Bytes aBytes = Bytes.wrap(aByte);
            this.store.put(aBytes, aByte);
            this.cache.put("0.0-one", aBytes, new LRUCacheEntry(aByte));
        }
        this.cache.put("0.0-one", Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
        this.cache.put("0.0-one", Bytes.wrap(bytes[2]), new LRUCacheEntry(null));
        this.cache.put("0.0-one", Bytes.wrap(bytes[3]), new LRUCacheEntry(null));
        this.cache.put("0.0-one", Bytes.wrap(bytes[8]), new LRUCacheEntry(null));
        this.cache.put("0.0-one", Bytes.wrap(bytes[11]), new LRUCacheEntry(null));
        MergedSortedCacheKeyValueBytesStoreIterator iterator = this.createIterator();
        Assert.assertArrayEquals((byte[])bytes[0], (byte[])((Bytes)((KeyValue)iterator.next()).key).get());
        Assert.assertArrayEquals((byte[])bytes[4], (byte[])((Bytes)((KeyValue)iterator.next()).key).get());
        Assert.assertArrayEquals((byte[])bytes[5], (byte[])((Bytes)((KeyValue)iterator.next()).key).get());
        Assert.assertArrayEquals((byte[])bytes[6], (byte[])((Bytes)((KeyValue)iterator.next()).key).get());
        Assert.assertArrayEquals((byte[])bytes[7], (byte[])((Bytes)((KeyValue)iterator.next()).key).get());
        Assert.assertArrayEquals((byte[])bytes[9], (byte[])((Bytes)((KeyValue)iterator.next()).key).get());
        Assert.assertArrayEquals((byte[])bytes[10], (byte[])((Bytes)((KeyValue)iterator.next()).key).get());
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldPeekNextKey() throws Exception {
        InMemoryKeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<Bytes, byte[]>("one", Serdes.Bytes(), Serdes.ByteArray());
        ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics()));
        byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
        for (int i = 0; i < bytes.length - 1; i += 2) {
            kv.put(Bytes.wrap(bytes[i]), bytes[i]);
            cache.put("0.0-one", Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
        }
        Bytes from = Bytes.wrap(new byte[]{2});
        Bytes to = Bytes.wrap(new byte[]{9});
        KeyValueIterator<Bytes, byte[]> storeIterator = kv.range(from, to);
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range("0.0-one", from, to);
        MergedSortedCacheKeyValueBytesStoreIterator iterator = new MergedSortedCacheKeyValueBytesStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, storeIterator);
        byte[][] values = new byte[8][];
        int index = 0;
        int bytesIndex = 2;
        while (iterator.hasNext()) {
            byte[] keys = ((Bytes)iterator.peekNextKey()).get();
            values[index++] = keys;
            Assert.assertArrayEquals((byte[])bytes[bytesIndex++], (byte[])keys);
            iterator.next();
        }
        iterator.close();
    }

    private MergedSortedCacheKeyValueBytesStoreIterator createIterator() {
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.all("0.0-one");
        DelegatingPeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<Bytes, byte[]>("store", this.store.all());
        return new MergedSortedCacheKeyValueBytesStoreIterator((PeekingKeyValueIterator<Bytes, LRUCacheEntry>)cacheIterator, (KeyValueIterator<Bytes, byte[]>)storeIterator);
    }
}

