/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KStreamSessionWindowAggregateProcessorTest {
    private static final long GAP_MS = 300000L;
    private static final String STORE_NAME = "session-store";
    private final Initializer<Long> initializer = new Initializer<Long>(){

        @Override
        public Long apply() {
            return 0L;
        }
    };
    private final Aggregator<String, String, Long> aggregator = new Aggregator<String, String, Long>(){

        @Override
        public Long apply(String aggKey, String value, Long aggregate) {
            return aggregate + 1L;
        }
    };
    private final Merger<String, Long> sessionMerger = new Merger<String, Long>(){

        @Override
        public Long apply(String aggKey, Long aggOne, Long aggTwo) {
            return aggOne + aggTwo;
        }
    };
    private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator = new KStreamSessionWindowAggregate<String, String, Long>(SessionWindows.with(300000L).until(900000L), "session-store", this.initializer, this.aggregator, this.sessionMerger);
    private final List<KeyValue> results = new ArrayList<KeyValue>();
    private Processor<String, String> processor = this.sessionAggregator.get();
    private SessionStore<String, Long> sessionStore;
    private MockProcessorContext context;

    @Before
    public void initializeStore() {
        File stateDir = TestUtils.tempDirectory();
        this.context = new MockProcessorContext(stateDir, Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 100000L, new MockStreamsMetrics(new Metrics()))){

            @Override
            public <K, V> void forward(K key, V value) {
                KStreamSessionWindowAggregateProcessorTest.this.results.add(KeyValue.pair(key, value));
            }
        };
        this.initStore(true);
        this.processor.init(this.context);
    }

    private void initStore(boolean enableCaching) {
        RocksDBSessionStoreSupplier<String, Long> supplier = new RocksDBSessionStoreSupplier<String, Long>(STORE_NAME, 900000L, Serdes.String(), Serdes.Long(), false, Collections.emptyMap(), enableCaching);
        this.sessionStore = supplier.get();
        this.sessionStore.init(this.context, this.sessionStore);
    }

    @After
    public void closeStore() {
        this.context.close();
        this.sessionStore.close();
    }

    @Test
    public void shouldCreateSingleSessionWhenWithinGap() {
        this.context.setTime(0L);
        this.processor.process("john", "first");
        this.context.setTime(500L);
        this.processor.process("john", "second");
        KeyValueIterator<Windowed<String>, Long> values = this.sessionStore.findSessions("john", 0L, 2000L);
        Assert.assertTrue((boolean)values.hasNext());
        Assert.assertEquals((Object)2L, ((KeyValue)values.next()).value);
    }

    @Test
    public void shouldMergeSessions() {
        this.context.setTime(0L);
        String sessionId = "mel";
        this.processor.process("mel", "first");
        Assert.assertTrue((boolean)this.sessionStore.findSessions("mel", 0L, 0L).hasNext());
        this.context.setTime(300001L);
        this.processor.process("mel", "second");
        Assert.assertTrue((boolean)this.sessionStore.findSessions("mel", 300001L, 300001L).hasNext());
        Assert.assertTrue((boolean)this.sessionStore.findSessions("mel", 0L, 0L).hasNext());
        this.context.setTime(150000L);
        this.processor.process("mel", "third");
        KeyValueIterator<Windowed<String>, Long> iterator = this.sessionStore.findSessions("mel", 0L, 300001L);
        KeyValue kv = (KeyValue)iterator.next();
        Assert.assertEquals((Object)3L, kv.value);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldUpdateSessionIfTheSameTime() {
        this.context.setTime(0L);
        this.processor.process("mel", "first");
        this.processor.process("mel", "second");
        KeyValueIterator<Windowed<String>, Long> iterator = this.sessionStore.findSessions("mel", 0L, 0L);
        Assert.assertEquals((Object)2L, ((KeyValue)iterator.next()).value);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
        String sessionId = "mel";
        long time = 0L;
        this.context.setTime(time);
        this.processor.process("mel", "first");
        this.context.setTime(time += 300001L);
        this.processor.process("mel", "second");
        this.processor.process("mel", "second");
        this.context.setTime(time += 300001L);
        this.processor.process("mel", "third");
        this.processor.process("mel", "third");
        this.processor.process("mel", "third");
        this.sessionStore.flush();
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed<String>("mel", new SessionWindow(0L, 0L)), new Change<Object>(1L, null)), KeyValue.pair(new Windowed<String>("mel", new SessionWindow(300001L, 300001L)), new Change<Object>(2L, null)), KeyValue.pair(new Windowed<String>("mel", new SessionWindow(time, time)), new Change<Object>(3L, null))), this.results);
    }

    @Test
    public void shouldRemoveMergedSessionsFromStateStore() {
        this.context.setTime(0L);
        this.processor.process("a", "1");
        KeyValueIterator<Windowed<String>, Long> a1 = this.sessionStore.findSessions("a", 0L, 0L);
        Assert.assertEquals(KeyValue.pair(new Windowed<String>("a", new SessionWindow(0L, 0L)), 1L), a1.next());
        this.context.setTime(100L);
        this.processor.process("a", "2");
        KeyValueIterator<Windowed<String>, Long> a2 = this.sessionStore.findSessions("a", 0L, 100L);
        Assert.assertEquals(KeyValue.pair(new Windowed<String>("a", new SessionWindow(0L, 100L)), 2L), a2.next());
        Assert.assertFalse((boolean)a2.hasNext());
    }

    @Test
    public void shouldHandleMultipleSessionsAndMerging() {
        this.context.setTime(0L);
        this.processor.process("a", "1");
        this.processor.process("b", "1");
        this.processor.process("c", "1");
        this.processor.process("d", "1");
        this.context.setTime(150000L);
        this.processor.process("d", "2");
        this.context.setTime(300001L);
        this.processor.process("a", "2");
        this.processor.process("b", "2");
        this.context.setTime(450001L);
        this.processor.process("a", "3");
        this.processor.process("c", "3");
        this.sessionStore.flush();
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed<String>("a", new SessionWindow(0L, 0L)), new Change<Object>(1L, null)), KeyValue.pair(new Windowed<String>("b", new SessionWindow(0L, 0L)), new Change<Object>(1L, null)), KeyValue.pair(new Windowed<String>("c", new SessionWindow(0L, 0L)), new Change<Object>(1L, null)), KeyValue.pair(new Windowed<String>("d", new SessionWindow(0L, 150000L)), new Change<Object>(2L, null)), KeyValue.pair(new Windowed<String>("b", new SessionWindow(300001L, 300001L)), new Change<Object>(1L, null)), KeyValue.pair(new Windowed<String>("a", new SessionWindow(300001L, 450001L)), new Change<Object>(2L, null)), KeyValue.pair(new Windowed<String>("c", new SessionWindow(450001L, 450001L)), new Change<Object>(1L, null))), this.results);
    }

    @Test
    public void shouldGetAggregatedValuesFromValueGetter() {
        KTableValueGetter<Windowed<String>, Long> getter2 = this.sessionAggregator.view().get();
        getter2.init(this.context);
        this.context.setTime(0L);
        this.processor.process("a", "1");
        this.context.setTime(300001L);
        this.processor.process("a", "1");
        this.processor.process("a", "2");
        long t0 = getter2.get(new Windowed<String>("a", new SessionWindow(0L, 0L)));
        long t1 = getter2.get(new Windowed<String>("a", new SessionWindow(300001L, 300001L)));
        Assert.assertEquals((long)1L, (long)t0);
        Assert.assertEquals((long)2L, (long)t1);
    }

    @Test
    public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
        this.initStore(false);
        this.processor.init(this.context);
        this.context.setTime(0L);
        this.processor.process("a", "1");
        this.processor.process("b", "1");
        this.processor.process("c", "1");
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed<String>("a", new SessionWindow(0L, 0L)), new Change<Object>(1L, null)), KeyValue.pair(new Windowed<String>("b", new SessionWindow(0L, 0L)), new Change<Object>(1L, null)), KeyValue.pair(new Windowed<String>("c", new SessionWindow(0L, 0L)), new Change<Object>(1L, null))), this.results);
    }

    @Test
    public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
        this.initStore(false);
        this.processor.init(this.context);
        this.context.setTime(0L);
        this.processor.process("a", "1");
        this.context.setTime(5L);
        this.processor.process("a", "1");
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed<String>("a", new SessionWindow(0L, 0L)), new Change<Object>(1L, null)), KeyValue.pair(new Windowed<String>("a", new SessionWindow(0L, 0L)), new Change<Object>(null, null)), KeyValue.pair(new Windowed<String>("a", new SessionWindow(0L, 5L)), new Change<Object>(2L, null))), this.results);
    }
}

