/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.CachingWindowStore;
import org.apache.kafka.streams.state.internals.MeteredWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class RocksDBWindowStoreSupplierTest {
    private static final String STORE_NAME = "name";
    private WindowStore<String, String> store;
    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024L, new MockStreamsMetrics(new Metrics()));
    private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new NoOpRecordCollector(), this.cache);

    @After
    public void close() {
        this.context.close();
        if (this.store != null) {
            this.store.close();
        }
    }

    @Test
    public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() {
        this.store = this.createStore(true, false, 3);
        final ArrayList logged = new ArrayList();
        NoOpRecordCollector collector = new NoOpRecordCollector(){

            @Override
            public <K, V> void send(String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
                logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
            }
        };
        MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, this.cache);
        context.setTime(1L);
        this.store.init(context, this.store);
        this.store.put("a", "b");
        Assert.assertFalse((boolean)logged.isEmpty());
    }

    @Test
    public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() {
        this.store = this.createStore(false, false, 3);
        final ArrayList logged = new ArrayList();
        NoOpRecordCollector collector = new NoOpRecordCollector(){

            @Override
            public <K, V> void send(String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
                logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
            }
        };
        MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, this.cache);
        context.setTime(1L);
        this.store.init(context, this.store);
        this.store.put("a", "b");
        Assert.assertTrue((boolean)logged.isEmpty());
    }

    @Test
    public void shouldBeCachedWindowStoreWhenCachingEnabled() {
        this.store = this.createStore(false, true, 3);
        this.store.init(this.context, this.store);
        this.context.setTime(1L);
        this.store.put("a", "b");
        this.store.put("b", "c");
        MatcherAssert.assertThat((Object)((WrappedStateStore)((Object)this.store)).wrappedStore(), (Matcher)CoreMatchers.is((Matcher)IsInstanceOf.instanceOf(CachingWindowStore.class)));
        MatcherAssert.assertThat((Object)this.context.getCache().size(), (Matcher)CoreMatchers.is((Object)2L));
    }

    @Test
    public void shouldHaveMeteredStoreAsOuterMost() {
        MatcherAssert.assertThat(this.createStore(false, false, 2), (Matcher)IsInstanceOf.instanceOf(MeteredWindowStore.class));
        MatcherAssert.assertThat(this.createStore(false, true, 2), (Matcher)IsInstanceOf.instanceOf(MeteredWindowStore.class));
        MatcherAssert.assertThat(this.createStore(true, false, 2), (Matcher)IsInstanceOf.instanceOf(MeteredWindowStore.class));
    }

    @Test
    public void shouldHaveMeteredStoreWhenCached() {
        this.store = this.createStore(false, true, 3);
        this.store.init(this.context, this.store);
        StreamsMetrics metrics = this.context.metrics();
        Assert.assertFalse((boolean)metrics.metrics().isEmpty());
    }

    @Test
    public void shouldHaveMeteredStoreWhenLogged() {
        this.store = this.createStore(true, false, 3);
        this.store.init(this.context, this.store);
        StreamsMetrics metrics = this.context.metrics();
        Assert.assertFalse((boolean)metrics.metrics().isEmpty());
    }

    @Test
    public void shouldHaveMeteredStoreWhenNotLoggedOrCached() {
        this.store = this.createStore(false, false, 3);
        this.store.init(this.context, this.store);
        StreamsMetrics metrics = this.context.metrics();
        Assert.assertFalse((boolean)metrics.metrics().isEmpty());
    }

    @Test(expected=IllegalArgumentException.class)
    public void shouldThrowIllegalArgumentExceptionIfNumSegmentsLessThanTwo() {
        this.createStore(true, true, 1);
    }

    private WindowStore<String, String> createStore(boolean logged, boolean cached, int numSegments) {
        return new RocksDBWindowStoreSupplier<String, String>(STORE_NAME, 10L, numSegments, false, Serdes.String(), Serdes.String(), 10L, logged, Collections.emptyMap(), cached).get();
    }
}

