/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.MeteredWindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MeteredWindowStoreTest {
    private MockProcessorContext context;
    private final WindowStore<Bytes, byte[]> innerStoreMock = (WindowStore)EasyMock.createNiceMock(WindowStore.class);
    private final MeteredWindowStore<String, String> store = new MeteredWindowStore<String, String>(this.innerStoreMock, "scope", (Time)new MockTime(), Serdes.String(), Serdes.String());
    private final Set<String> latencyRecorded = new HashSet<String>();
    private final Set<String> throughputRecorded = new HashSet<String>();

    @Before
    public void setUp() throws Exception {
        final Metrics metrics = new Metrics();
        final StreamsMetrics streamsMetrics = new StreamsMetrics(){

            @Override
            public Map<MetricName, ? extends Metric> metrics() {
                return Collections.unmodifiableMap(metrics.metrics());
            }

            @Override
            public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordLevel, String ... tags) {
                return metrics.sensor(operationName);
            }

            @Override
            public void recordLatency(Sensor sensor, long startNs, long endNs) {
                MeteredWindowStoreTest.this.latencyRecorded.add(sensor.name());
            }

            @Override
            public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordLevel, String ... tags) {
                return metrics.sensor(operationName);
            }

            @Override
            public void recordThroughput(Sensor sensor, long value) {
                MeteredWindowStoreTest.this.throughputRecorded.add(sensor.name());
            }

            @Override
            public void removeSensor(Sensor sensor) {
                metrics.removeSensor(sensor.name());
            }

            @Override
            public Sensor addSensor(String name, Sensor.RecordingLevel recordLevel) {
                return metrics.sensor(name);
            }

            @Override
            public Sensor addSensor(String name, Sensor.RecordingLevel recordLevel, Sensor ... parents) {
                return metrics.sensor(name);
            }
        };
        this.context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 0L, streamsMetrics)){

            @Override
            public StreamsMetrics metrics() {
                return streamsMetrics;
            }
        };
        EasyMock.expect((Object)this.innerStoreMock.name()).andReturn((Object)"store").anyTimes();
    }

    @After
    public void after() {
        this.context.close();
    }

    @Test
    public void shouldRecordRestoreLatencyOnInit() throws Exception {
        this.innerStoreMock.init(this.context, this.store);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        this.store.init(this.context, this.store);
        Assert.assertTrue((boolean)this.latencyRecorded.contains("restore"));
    }

    @Test
    public void shouldRecordPutLatency() throws Exception {
        byte[] bytes = "a".getBytes();
        this.innerStoreMock.put((Bytes)EasyMock.eq((Object)Bytes.wrap(bytes)), (byte[])EasyMock.anyObject(), EasyMock.eq((long)this.context.timestamp()));
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        this.store.init(this.context, this.store);
        this.store.put("a", "a");
        Assert.assertTrue((boolean)this.latencyRecorded.contains("put"));
        EasyMock.verify((Object[])new Object[]{this.innerStoreMock});
    }

    @Test
    public void shouldRecordFetchLatency() throws Exception {
        EasyMock.expect(this.innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1L, 1L)).andReturn(KeyValueIterators.emptyWindowStoreIterator());
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        this.store.init(this.context, this.store);
        this.store.fetch("a", 1L, 1L).close();
        Assert.assertTrue((boolean)this.latencyRecorded.contains("fetch"));
        EasyMock.verify((Object[])new Object[]{this.innerStoreMock});
    }

    @Test
    public void shouldRecordFetchRangeLatency() throws Exception {
        EasyMock.expect(this.innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1L, 1L)).andReturn(KeyValueIterators.emptyIterator());
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        this.store.init(this.context, this.store);
        this.store.fetch("a", "b", 1L, 1L).close();
        Assert.assertTrue((boolean)this.latencyRecorded.contains("fetch"));
        EasyMock.verify((Object[])new Object[]{this.innerStoreMock});
    }

    @Test
    public void shouldRecordFlushLatency() throws Exception {
        this.innerStoreMock.flush();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        this.store.init(this.context, this.store);
        this.store.flush();
        Assert.assertTrue((boolean)this.latencyRecorded.contains("flush"));
        EasyMock.verify((Object[])new Object[]{this.innerStoreMock});
    }

    @Test
    public void shouldCloseUnderlyingStore() throws Exception {
        this.innerStoreMock.close();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        this.store.init(this.context, this.store);
        this.store.close();
        EasyMock.verify((Object[])new Object[]{this.innerStoreMock});
    }
}

