/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class KGroupedStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String INVALID_STORE_NAME = "~foo bar~";
    private final StreamsBuilder builder = new StreamsBuilder();
    private KGroupedStream<String, String> groupedStream;
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Before
    public void before() {
        KStream<String, String> stream = this.builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
        this.groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullReducerOnReduce() {
        this.groupedStream.reduce(null, "store");
    }

    @Test
    public void shouldAllowNullStoreNameOnReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (String)null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameOnReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, INVALID_STORE_NAME);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (StateStoreSupplier<KeyValueStore>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnCount() {
        this.groupedStream.count((StateStoreSupplier<KeyValueStore>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnWindowedCount() {
        this.groupedStream.count(TimeWindows.of(10L), (StateStoreSupplier<WindowStore>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullReducerWithWindowedReduce() {
        this.groupedStream.reduce(null, TimeWindows.of(10L), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullWindowsWithWindowedReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (Windows)null, "store");
    }

    @Test
    public void shouldAllowNullStoreNameWithWindowedReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10L), (String)null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10L), INVALID_STORE_NAME);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullInitializerOnAggregate() {
        this.groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String(), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullAdderOnAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String(), "store");
    }

    @Test
    public void shouldAllowNullStoreNameOnAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameOnAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), INVALID_STORE_NAME);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullInitializerOnWindowedAggregate() {
        this.groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L), Serdes.String(), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullAdderOnWindowedAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, null, TimeWindows.of(10L), Serdes.String(), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullWindowsOnWindowedAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Serdes.String(), "store");
    }

    @Test
    public void shouldAllowNullStoreNameOnWindowedAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L), Serdes.String(), null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L), Serdes.String(), INVALID_STORE_NAME);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L), (StateStoreSupplier<WindowStore>)null);
    }

    private void doAggregateSessionWindows(Map<Windowed<String>, Integer> results) {
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "2", "2");
        this.driver.setTime(30L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(70L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(90L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(100L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.flushState();
        Assert.assertEquals((Object)2, (Object)results.get(new Windowed<String>("1", new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)1, (Object)results.get(new Windowed<String>("2", new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)3, (Object)results.get(new Windowed<String>("1", new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldAggregateSessionWindows() {
        final HashMap<Windowed<String>, Integer> results = new HashMap<Windowed<String>, Integer>();
        KTable<Windowed<String>, Integer> table = this.groupedStream.aggregate(new Initializer<Integer>(){

            @Override
            public Integer apply() {
                return 0;
            }
        }, new Aggregator<String, String, Integer>(){

            @Override
            public Integer apply(String aggKey, String value, Integer aggregate) {
                return aggregate + 1;
            }
        }, new Merger<String, Integer>(){

            @Override
            public Integer apply(String aggKey, Integer aggOne, Integer aggTwo) {
                return aggOne + aggTwo;
            }
        }, SessionWindows.with(30L), Serdes.Integer(), "session-store");
        table.toStream().foreach(new ForeachAction<Windowed<String>, Integer>(){

            @Override
            public void apply(Windowed<String> key, Integer value) {
                results.put(key, value);
            }
        });
        this.doAggregateSessionWindows(results);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldAggregateSessionWindowsWithInternalStoreName() {
        final HashMap<Windowed<String>, Integer> results = new HashMap<Windowed<String>, Integer>();
        KTable<Windowed<String>, Integer> table = this.groupedStream.aggregate(new Initializer<Integer>(){

            @Override
            public Integer apply() {
                return 0;
            }
        }, new Aggregator<String, String, Integer>(){

            @Override
            public Integer apply(String aggKey, String value, Integer aggregate) {
                return aggregate + 1;
            }
        }, new Merger<String, Integer>(){

            @Override
            public Integer apply(String aggKey, Integer aggOne, Integer aggTwo) {
                return aggOne + aggTwo;
            }
        }, SessionWindows.with(30L), Serdes.Integer());
        table.toStream().foreach(new ForeachAction<Windowed<String>, Integer>(){

            @Override
            public void apply(Windowed<String> key, Integer value) {
                results.put(key, value);
            }
        });
        this.doAggregateSessionWindows(results);
    }

    private void doCountSessionWindows(Map<Windowed<String>, Long> results) {
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "2", "2");
        this.driver.setTime(30L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(70L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(90L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(100L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.flushState();
        Assert.assertEquals((Object)2L, (Object)results.get(new Windowed<String>("1", new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)1L, (Object)results.get(new Windowed<String>("2", new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)3L, (Object)results.get(new Windowed<String>("1", new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldCountSessionWindows() {
        final HashMap<Windowed<String>, Long> results = new HashMap<Windowed<String>, Long>();
        KTable<Windowed<String>, Long> table = this.groupedStream.count(SessionWindows.with(30L), "session-store");
        table.toStream().foreach(new ForeachAction<Windowed<String>, Long>(){

            @Override
            public void apply(Windowed<String> key, Long value) {
                results.put(key, value);
            }
        });
        this.doCountSessionWindows(results);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldCountSessionWindowsWithInternalStoreName() {
        final HashMap<Windowed<String>, Long> results = new HashMap<Windowed<String>, Long>();
        KTable<Windowed<String>, Long> table = this.groupedStream.count(SessionWindows.with(30L));
        table.toStream().foreach(new ForeachAction<Windowed<String>, Long>(){

            @Override
            public void apply(Windowed<String> key, Long value) {
                results.put(key, value);
            }
        });
        this.doCountSessionWindows(results);
        Assert.assertNull((Object)table.queryableStoreName());
    }

    private void doReduceSessionWindows(Map<Windowed<String>, String> results) {
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "2", "Z");
        this.driver.setTime(30L);
        this.driver.process(TOPIC, "1", "B");
        this.driver.setTime(70L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.setTime(90L);
        this.driver.process(TOPIC, "1", "B");
        this.driver.setTime(100L);
        this.driver.process(TOPIC, "1", "C");
        this.driver.flushState();
        Assert.assertEquals((Object)"A:B", (Object)results.get(new Windowed<String>("1", new SessionWindow(10L, 30L))));
        Assert.assertEquals((Object)"Z", (Object)results.get(new Windowed<String>("2", new SessionWindow(15L, 15L))));
        Assert.assertEquals((Object)"A:B:C", (Object)results.get(new Windowed<String>("1", new SessionWindow(70L, 100L))));
    }

    @Test
    public void shouldReduceSessionWindows() {
        final HashMap<Windowed<String>, String> results = new HashMap<Windowed<String>, String>();
        KTable<Windowed<String>, String> table = this.groupedStream.reduce(new Reducer<String>(){

            @Override
            public String apply(String value1, String value2) {
                return value1 + ":" + value2;
            }
        }, SessionWindows.with(30L), "session-store");
        table.toStream().foreach(new ForeachAction<Windowed<String>, String>(){

            @Override
            public void apply(Windowed<String> key, String value) {
                results.put(key, value);
            }
        });
        this.doReduceSessionWindows(results);
        Assert.assertEquals((Object)table.queryableStoreName(), (Object)"session-store");
    }

    @Test
    public void shouldReduceSessionWindowsWithInternalStoreName() {
        final HashMap<Windowed<String>, String> results = new HashMap<Windowed<String>, String>();
        KTable<Windowed<String>, String> table = this.groupedStream.reduce(new Reducer<String>(){

            @Override
            public String apply(String value1, String value2) {
                return value1 + ":" + value2;
            }
        }, SessionWindows.with(30L));
        table.toStream().foreach(new ForeachAction<Windowed<String>, String>(){

            @Override
            public void apply(Windowed<String> key, String value) {
                results.put(key, value);
            }
        });
        this.doReduceSessionWindows(results);
        Assert.assertNull((Object)table.queryableStoreName());
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
        this.groupedStream.reduce(null, SessionWindows.with(10L), "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (SessionWindows)null, "store");
    }

    @Test
    public void shouldAcceptNullStoreNameWhenReducingSessionWindows() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10L), (String)null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10L), INVALID_STORE_NAME);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10L), (StateStoreSupplier<SessionStore>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
        this.groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, new Merger<String, String>(){

            @Override
            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), "storeName");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, null, new Merger<String, String>(){

            @Override
            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), "storeName");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, SessionWindows.with(10L), Serdes.String(), "storeName");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>(){

            @Override
            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, null, Serdes.String(), "storeName");
    }

    @Test
    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>(){

            @Override
            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), (String)null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>(){

            @Override
            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), INVALID_STORE_NAME);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>(){

            @Override
            public String apply(String aggKey, String aggOne, String aggTwo) {
                return null;
            }
        }, SessionWindows.with(10L), Serdes.String(), (StateStoreSupplier<SessionStore>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullSessionWindowsWhenCountingSessionWindows() {
        this.groupedStream.count((SessionWindows)null, "store");
    }

    @Test
    public void shouldAcceptNullStoreNameWhenCountingSessionWindows() {
        this.groupedStream.count(SessionWindows.with(90L), (String)null);
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotAcceptInvalidStoreNameWhenCountingSessionWindows() {
        this.groupedStream.count(SessionWindows.with(90L), INVALID_STORE_NAME);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAcceptNullStateStoreSupplierWhenCountingSessionWindows() {
        this.groupedStream.count(SessionWindows.with(90L), (StateStoreSupplier<SessionStore>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
        this.groupedStream.count((Materialized<String, Long, KeyValueStore<Bytes, byte[]>>)null);
    }

    @Test
    public void shouldCountAndMaterializeResults() {
        this.groupedStream.count(Materialized.as("count").withKeySerde(Serdes.String()));
        this.processData();
        KeyValueStore count = (KeyValueStore)this.driver.allStateStores().get("count");
        MatcherAssert.assertThat(count.get("1"), (Matcher)CoreMatchers.equalTo((Object)3L));
        MatcherAssert.assertThat(count.get("2"), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat(count.get("3"), (Matcher)CoreMatchers.equalTo((Object)2L));
    }

    @Test
    public void shouldReduceAndMaterializeResults() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        this.processData();
        KeyValueStore reduced = (KeyValueStore)this.driver.allStateStores().get("reduce");
        MatcherAssert.assertThat(reduced.get("1"), (Matcher)CoreMatchers.equalTo((Object)"A+C+D"));
        MatcherAssert.assertThat(reduced.get("2"), (Matcher)CoreMatchers.equalTo((Object)"B"));
        MatcherAssert.assertThat(reduced.get("3"), (Matcher)CoreMatchers.equalTo((Object)"E+F"));
    }

    @Test
    public void shouldAggregateAndMaterializeResults() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("aggregate").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        this.processData();
        KeyValueStore aggregate = (KeyValueStore)this.driver.allStateStores().get("aggregate");
        MatcherAssert.assertThat(aggregate.get("1"), (Matcher)CoreMatchers.equalTo((Object)"0+A+C+D"));
        MatcherAssert.assertThat(aggregate.get("2"), (Matcher)CoreMatchers.equalTo((Object)"0+B"));
        MatcherAssert.assertThat(aggregate.get("3"), (Matcher)CoreMatchers.equalTo((Object)"0+E+F"));
    }

    @Test
    public void shouldAggregateWithDefaultSerdes() {
        final HashMap results = new HashMap();
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).toStream().foreach((ForeachAction<Object, String>)new ForeachAction<String, String>(){

            @Override
            public void apply(String key, String value) {
                results.put(key, value);
            }
        });
        this.processData();
        MatcherAssert.assertThat(results.get("1"), (Matcher)CoreMatchers.equalTo((Object)"0+A+C+D"));
        MatcherAssert.assertThat(results.get("2"), (Matcher)CoreMatchers.equalTo((Object)"0+B"));
        MatcherAssert.assertThat(results.get("3"), (Matcher)CoreMatchers.equalTo((Object)"0+E+F"));
    }

    private void processData() {
        this.driver.setUp(this.builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0L);
        this.driver.setTime(0L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "2", "B");
        this.driver.process(TOPIC, "1", "C");
        this.driver.process(TOPIC, "1", "D");
        this.driver.process(TOPIC, "3", "E");
        this.driver.process(TOPIC, "3", "F");
        this.driver.process(TOPIC, "3", null);
        this.driver.flushState();
    }

    private void doCountWindowed(List<KeyValue<Windowed<String>, Long>> results) {
        this.driver.setUp(this.builder, TestUtils.tempDirectory(), 0L);
        this.driver.setTime(0L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "2", "B");
        this.driver.process(TOPIC, "3", "C");
        this.driver.setTime(500L);
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "1", "A");
        this.driver.process(TOPIC, "2", "B");
        this.driver.process(TOPIC, "2", "B");
        MatcherAssert.assertThat(results, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed<String>("1", new TimeWindow(0L, 500L)), 1L), KeyValue.pair(new Windowed<String>("2", new TimeWindow(0L, 500L)), 1L), KeyValue.pair(new Windowed<String>("3", new TimeWindow(0L, 500L)), 1L), KeyValue.pair(new Windowed<String>("1", new TimeWindow(500L, 1000L)), 1L), KeyValue.pair(new Windowed<String>("1", new TimeWindow(500L, 1000L)), 2L), KeyValue.pair(new Windowed<String>("2", new TimeWindow(500L, 1000L)), 1L), KeyValue.pair(new Windowed<String>("2", new TimeWindow(500L, 1000L)), 2L))));
    }

    @Test
    public void shouldCountWindowed() {
        final ArrayList<KeyValue<Windowed<String>, Long>> results = new ArrayList<KeyValue<Windowed<String>, Long>>();
        this.groupedStream.count(TimeWindows.of(500L), "aggregate-by-key-windowed").toStream().foreach(new ForeachAction<Windowed<String>, Long>(){

            @Override
            public void apply(Windowed<String> key, Long value) {
                results.add(KeyValue.pair(key, value));
            }
        });
        this.doCountWindowed(results);
    }

    @Test
    public void shouldCountWindowedWithInternalStoreName() {
        final ArrayList<KeyValue<Windowed<String>, Long>> results = new ArrayList<KeyValue<Windowed<String>, Long>>();
        this.groupedStream.count(TimeWindows.of(500L)).toStream().foreach(new ForeachAction<Windowed<String>, Long>(){

            @Override
            public void apply(Windowed<String> key, Long value) {
                results.add(KeyValue.pair(key, value));
            }
        });
        this.doCountWindowed(results);
    }
}

