/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.MeteredWindowStore;
import org.apache.kafka.streams.state.internals.SerdeThatDoesntHandleNull;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class MeteredWindowStoreTest {
    private static final String STORE_TYPE = "scope";
    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
    private static final String THREAD_ID_TAG_KEY = "thread-id";
    private static final String STORE_NAME = "mocked-store";
    private static final String CHANGELOG_TOPIC = "changelog-topic";
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap((byte[])"key".getBytes());
    private static final String VALUE = "value";
    private static final byte[] VALUE_BYTES = "value".getBytes();
    private static final int WINDOW_SIZE_MS = 10;
    private static final int RETENTION_PERIOD = 100;
    private static final long TIMESTAMP = 42L;
    private final String threadId = Thread.currentThread().getName();
    private InternalMockProcessorContext context;
    private final WindowStore<Bytes, byte[]> innerStoreMock = (WindowStore)Mockito.mock(WindowStore.class);
    private MeteredWindowStore<String, String> store = new MeteredWindowStore(this.innerStoreMock, 10L, "scope", (Time)new MockTime(), Serdes.String(), (Serde)new SerdeThatDoesntHandleNull());
    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
    private Map<String, String> tags;

    public MeteredWindowStoreTest() {
        Mockito.when((Object)this.innerStoreMock.name()).thenReturn((Object)STORE_NAME);
    }

    @Before
    public void setUp() {
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)new MockTime());
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), streamsMetrics, new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, streamsMetrics), Time.SYSTEM);
        this.tags = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)THREAD_ID_TAG_KEY, (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)this.context.taskId().toString()), Utils.mkEntry((Object)"scope-state-id", (Object)STORE_NAME)});
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        MeteredWindowStore outer = new MeteredWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new SerdeThatDoesntHandleNull());
        Mockito.when((Object)this.innerStoreMock.name()).thenReturn((Object)"store");
        ((WindowStore)Mockito.doNothing().when(this.innerStoreMock)).init((ProcessorContext)this.context, (StateStore)outer);
        outer.init((ProcessorContext)this.context, (StateStore)outer);
    }

    @Test
    public void shouldDelegateInit() {
        MeteredWindowStore outer = new MeteredWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new SerdeThatDoesntHandleNull());
        Mockito.when((Object)this.innerStoreMock.name()).thenReturn((Object)"store");
        ((WindowStore)Mockito.doNothing().when(this.innerStoreMock)).init((StateStoreContext)this.context, (StateStore)outer);
        outer.init((StateStoreContext)this.context, (StateStore)outer);
    }

    @Test
    public void shouldPassChangelogTopicNameToStateStoreSerde() {
        this.context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
        this.doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
    }

    @Test
    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
        String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic((String)this.context.applicationId(), (String)STORE_NAME, (String)this.context.taskId().topologyName());
        this.doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
    }

    private void doShouldPassChangelogTopicNameToStateStoreSerde(String topic) {
        Serde keySerde = (Serde)Mockito.mock(Serde.class);
        Serializer keySerializer = (Serializer)Mockito.mock(Serializer.class);
        Serde valueSerde = (Serde)Mockito.mock(Serde.class);
        Deserializer valueDeserializer = (Deserializer)Mockito.mock(Deserializer.class);
        Serializer valueSerializer = (Serializer)Mockito.mock(Serializer.class);
        Mockito.when((Object)keySerde.serializer()).thenReturn((Object)keySerializer);
        Mockito.when((Object)keySerializer.serialize(topic, (Object)KEY)).thenReturn((Object)KEY.getBytes());
        Mockito.when((Object)valueSerde.deserializer()).thenReturn((Object)valueDeserializer);
        Mockito.when((Object)valueDeserializer.deserialize(topic, VALUE_BYTES)).thenReturn((Object)VALUE);
        Mockito.when((Object)valueSerde.serializer()).thenReturn((Object)valueSerializer);
        Mockito.when((Object)valueSerializer.serialize(topic, (Object)VALUE)).thenReturn((Object)VALUE_BYTES);
        Mockito.when((Object)this.innerStoreMock.fetch((Object)KEY_BYTES, 42L)).thenReturn((Object)VALUE_BYTES);
        this.store = new MeteredWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), keySerde, valueSerde);
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.fetch((Object)KEY, 42L);
        this.store.put((Object)KEY, (Object)VALUE, 42L);
    }

    @Test
    public void testMetrics() {
        this.store.init((StateStoreContext)this.context, this.store);
        JmxReporter reporter = new JmxReporter();
        KafkaMetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
        reporter.contextChange((MetricsContext)metricsContext);
        this.metrics.addReporter((MetricsReporter)reporter);
        Assert.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", STORE_LEVEL_GROUP, THREAD_ID_TAG_KEY, this.threadId, this.context.taskId().toString(), STORE_TYPE, STORE_NAME)));
    }

    @Test
    public void shouldRecordRestoreLatencyOnInit() {
        ((WindowStore)Mockito.doNothing().when(this.innerStoreMock)).init((StateStoreContext)this.context, this.store);
        this.store.init((StateStoreContext)this.context, this.store);
        KafkaMetric metric = this.metric("restore-rate");
        MatcherAssert.assertThat((Object)((Double)metric.metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldPutToInnerStoreAndRecordPutMetrics() {
        byte[] bytes = "a".getBytes();
        ((WindowStore)Mockito.doNothing().when(this.innerStoreMock)).put(ArgumentMatchers.eq((Object)Bytes.wrap((byte[])bytes)), ArgumentMatchers.any(), ArgumentMatchers.eq((long)this.context.timestamp()));
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.put((Object)"a", (Object)"a", this.context.timestamp());
        KafkaMetric metric = this.metric("put-rate");
        MatcherAssert.assertThat((Object)((Double)metric.metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldFetchFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when((Object)this.innerStoreMock.fetch((Object)Bytes.wrap((byte[])"a".getBytes()), 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyWindowStoreIterator());
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.fetch((Object)"a", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        KafkaMetric metric = this.metric("fetch-rate");
        MatcherAssert.assertThat((Object)((Double)metric.metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldReturnNoRecordWhenFetchedKeyHasExpired() {
        Mockito.when((Object)this.innerStoreMock.fetch((Object)Bytes.wrap((byte[])"a".getBytes()), 1L, 101L)).thenReturn((Object)KeyValueIterators.emptyWindowStoreIterator());
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.fetch((Object)"a", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L).plus(100L, ChronoUnit.MILLIS)).close();
    }

    @Test
    public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when((Object)this.innerStoreMock.fetch((Object)Bytes.wrap((byte[])"a".getBytes()), (Object)Bytes.wrap((byte[])"b".getBytes()), 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        Mockito.when((Object)this.innerStoreMock.fetch(null, (Object)Bytes.wrap((byte[])"b".getBytes()), 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        Mockito.when((Object)this.innerStoreMock.fetch((Object)Bytes.wrap((byte[])"a".getBytes()), null, 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        Mockito.when((Object)this.innerStoreMock.fetch(null, null, 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.fetch((Object)"a", (Object)"b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.fetch(null, (Object)"b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.fetch((Object)"a", null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.fetch(null, null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        KafkaMetric metric = this.metric("fetch-rate");
        MatcherAssert.assertThat((Object)((Double)metric.metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldBackwardFetchFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when((Object)this.innerStoreMock.backwardFetch((Object)Bytes.wrap((byte[])"a".getBytes()), (Object)Bytes.wrap((byte[])"b".getBytes()), 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.backwardFetch((Object)"a", (Object)"b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        KafkaMetric metric = this.metric("fetch-rate");
        MatcherAssert.assertThat((Object)((Double)metric.metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when((Object)this.innerStoreMock.backwardFetch((Object)Bytes.wrap((byte[])"a".getBytes()), (Object)Bytes.wrap((byte[])"b".getBytes()), 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        Mockito.when((Object)this.innerStoreMock.backwardFetch(null, (Object)Bytes.wrap((byte[])"b".getBytes()), 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        Mockito.when((Object)this.innerStoreMock.backwardFetch((Object)Bytes.wrap((byte[])"a".getBytes()), null, 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        Mockito.when((Object)this.innerStoreMock.backwardFetch(null, null, 1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.backwardFetch((Object)"a", (Object)"b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.backwardFetch(null, (Object)"b", Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.backwardFetch((Object)"a", null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        this.store.backwardFetch(null, null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        KafkaMetric metric = this.metric("fetch-rate");
        MatcherAssert.assertThat((Object)((Double)metric.metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldFetchAllFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when((Object)this.innerStoreMock.fetchAll(1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.fetchAll(Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        KafkaMetric metric = this.metric("fetch-rate");
        MatcherAssert.assertThat((Object)((Double)metric.metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldBackwardFetchAllFromInnerStoreAndRecordFetchMetrics() {
        Mockito.when((Object)this.innerStoreMock.backwardFetchAll(1L, 1L)).thenReturn((Object)KeyValueIterators.emptyIterator());
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.backwardFetchAll(Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L)).close();
        KafkaMetric metric = this.metric("fetch-rate");
        MatcherAssert.assertThat((Object)((Double)metric.metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldRecordFlushLatency() {
        ((WindowStore)Mockito.doNothing().when(this.innerStoreMock)).flush();
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.flush();
        KafkaMetric metric = this.metric("flush-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void shouldNotThrowNullPointerExceptionIfFetchReturnsNull() {
        Mockito.when((Object)this.innerStoreMock.fetch((Object)Bytes.wrap((byte[])"a".getBytes()), 0L)).thenReturn(null);
        this.store.init((StateStoreContext)this.context, this.store);
        Assert.assertNull((Object)this.store.fetch((Object)"a", 0L));
    }

    @Test
    public void shouldSetFlushListenerOnWrappedCachingStore() {
        CachedWindowStore cachedWindowStore = (CachedWindowStore)Mockito.mock(CachedWindowStore.class);
        Mockito.when((Object)cachedWindowStore.setFlushListener((CacheFlushListener)ArgumentMatchers.any(CacheFlushListener.class), ArgumentMatchers.eq((boolean)false))).thenReturn((Object)true);
        MeteredWindowStore metered = new MeteredWindowStore((WindowStore)cachedWindowStore, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new SerdeThatDoesntHandleNull());
        Assert.assertTrue((boolean)metered.setFlushListener(null, false));
    }

    @Test
    public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
        Assert.assertFalse((boolean)this.store.setFlushListener(null, false));
    }

    @Test
    public void shouldCloseUnderlyingStore() {
        ((WindowStore)Mockito.doNothing().when(this.innerStoreMock)).close();
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.close();
    }

    @Test
    public void shouldRemoveMetricsOnClose() {
        ((WindowStore)Mockito.doNothing().when(this.innerStoreMock)).close();
        this.store.init((StateStoreContext)this.context, this.store);
        MatcherAssert.assertThat(this.storeMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        this.store.close();
        MatcherAssert.assertThat(this.storeMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
        ((WindowStore)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Oops!")}).when(this.innerStoreMock)).close();
        this.store.init((StateStoreContext)this.context, this.store);
        MatcherAssert.assertThat(this.storeMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        Assert.assertThrows(RuntimeException.class, () -> this.store.close());
        MatcherAssert.assertThat(this.storeMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldThrowNullPointerOnPutIfKeyIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.store.put(null, (Object)"a", 1L));
    }

    @Test
    public void shouldThrowNullPointerOnFetchIfKeyIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.store.fetch(null, 0L, 1L));
    }

    @Test
    public void shouldThrowNullPointerOnBackwardFetchIfKeyIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.store.backwardFetch(null, 0L, 1L));
    }

    private KafkaMetric metric(String name) {
        return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", this.tags));
    }

    private List<MetricName> storeMetrics() {
        return this.metrics.metrics().keySet().stream().filter(name -> name.group().equals(STORE_LEVEL_GROUP) && name.tags().equals(this.tags)).collect(Collectors.toList());
    }

    private static interface CachedWindowStore
    extends WindowStore<Bytes, byte[]>,
    CachedStateStore<byte[], byte[]> {
    }
}

