/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KStreamAggregationIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static volatile int testNo = 0;
    private final MockTime mockTime;
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String userSessionsStream;
    private String outputTopic;
    private KGroupedStream<String, String> groupedStream;
    private Reducer<String> reducer;
    private Initializer<Integer> initializer;
    private Aggregator<String, String, Integer> aggregator;
    private KStream<Integer, String> stream;

    public KStreamAggregationIntegrationTest() {
        this.mockTime = KStreamAggregationIntegrationTest.CLUSTER.time;
        this.userSessionsStream = "user-sessions";
    }

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String applicationId = "kgrouped-stream-test-" + ++testNo;
        this.streamsConfiguration.put("application.id", applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        this.streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        KeyValueMapper mapper = MockMapper.selectValueMapper();
        this.stream = this.builder.stream(this.streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
        this.groupedStream = this.stream.groupBy(mapper, Serialized.with(Serdes.String(), Serdes.String()));
        this.reducer = new Reducer<String>(){

            @Override
            public String apply(String value1, String value2) {
                return value1 + ":" + value2;
            }
        };
        this.initializer = new Initializer<Integer>(){

            @Override
            public Integer apply() {
                return 0;
            }
        };
        this.aggregator = new Aggregator<String, String, Integer>(){

            @Override
            public Integer apply(String aggKey, String value, Integer aggregate) {
                return aggregate + value.length();
            }
        };
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReduce() throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.reduce(this.reducer, "reduce-by-key").to(Serdes.String(), Serdes.String(), this.outputTopic);
        this.startStreams();
        this.produceMessages(this.mockTime.milliseconds());
        List<KeyValue<String, String>> results = this.receiveMessages(new StringDeserializer(), new StringDeserializer(), 10);
        Collections.sort(results, new Comparator<KeyValue<String, String>>(){

            @Override
            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
                return KStreamAggregationIntegrationTest.compare(o1, o2);
            }
        });
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair("A", "A"), KeyValue.pair("A", "A:A"), KeyValue.pair("B", "B"), KeyValue.pair("B", "B:B"), KeyValue.pair("C", "C"), KeyValue.pair("C", "C:C"), KeyValue.pair("D", "D"), KeyValue.pair("D", "D:D"), KeyValue.pair("E", "E"), KeyValue.pair("E", "E:E"))));
    }

    private static <K extends Comparable, V extends Comparable> int compare(KeyValue<K, V> o1, KeyValue<K, V> o2) {
        int keyComparison = ((Comparable)o1.key).compareTo(o2.key);
        if (keyComparison == 0) {
            return ((Comparable)o1.value).compareTo(o2.value);
        }
        return keyComparison;
    }

    @Test
    public void shouldReduceWindowed() throws Exception {
        long firstBatchTimestamp = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        this.produceMessages(firstBatchTimestamp);
        long secondBatchTimestamp = this.mockTime.milliseconds();
        this.produceMessages(secondBatchTimestamp);
        this.produceMessages(secondBatchTimestamp);
        this.groupedStream.windowedBy(TimeWindows.of(500L)).reduce(this.reducer).toStream(new KeyValueMapper<Windowed<String>, String, String>(){

            @Override
            public String apply(Windowed<String> windowedKey, String value) {
                return windowedKey.key() + "@" + windowedKey.window().start();
            }
        }).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
        this.startStreams();
        List<KeyValue<String, String>> windowedOutput = this.receiveMessages(new StringDeserializer(), new StringDeserializer(), 15);
        Comparator<KeyValue<String, String>> comparator = new Comparator<KeyValue<String, String>>(){

            @Override
            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
                return KStreamAggregationIntegrationTest.compare(o1, o2);
            }
        };
        Collections.sort(windowedOutput, comparator);
        long firstBatchWindow = firstBatchTimestamp / 500L * 500L;
        long secondBatchWindow = secondBatchTimestamp / 500L * 500L;
        MatcherAssert.assertThat(windowedOutput, (Matcher)Is.is(Arrays.asList(new KeyValue<String, String>("A@" + firstBatchWindow, "A"), new KeyValue<String, String>("A@" + secondBatchWindow, "A"), new KeyValue<String, String>("A@" + secondBatchWindow, "A:A"), new KeyValue<String, String>("B@" + firstBatchWindow, "B"), new KeyValue<String, String>("B@" + secondBatchWindow, "B"), new KeyValue<String, String>("B@" + secondBatchWindow, "B:B"), new KeyValue<String, String>("C@" + firstBatchWindow, "C"), new KeyValue<String, String>("C@" + secondBatchWindow, "C"), new KeyValue<String, String>("C@" + secondBatchWindow, "C:C"), new KeyValue<String, String>("D@" + firstBatchWindow, "D"), new KeyValue<String, String>("D@" + secondBatchWindow, "D"), new KeyValue<String, String>("D@" + secondBatchWindow, "D:D"), new KeyValue<String, String>("E@" + firstBatchWindow, "E"), new KeyValue<String, String>("E@" + secondBatchWindow, "E"), new KeyValue<String, String>("E@" + secondBatchWindow, "E:E"))));
    }

    @Test
    public void shouldAggregate() throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.aggregate(this.initializer, this.aggregator, Serdes.Integer(), "aggregate-by-selected-key").to(Serdes.String(), Serdes.Integer(), this.outputTopic);
        this.startStreams();
        this.produceMessages(this.mockTime.milliseconds());
        List<KeyValue<String, Integer>> results = this.receiveMessages(new StringDeserializer(), new IntegerDeserializer(), 10);
        Collections.sort(results, new Comparator<KeyValue<String, Integer>>(){

            @Override
            public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) {
                return KStreamAggregationIntegrationTest.compare(o1, o2);
            }
        });
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair("A", 1), KeyValue.pair("A", 2), KeyValue.pair("B", 1), KeyValue.pair("B", 2), KeyValue.pair("C", 1), KeyValue.pair("C", 2), KeyValue.pair("D", 1), KeyValue.pair("D", 2), KeyValue.pair("E", 1), KeyValue.pair("E", 2))));
    }

    @Test
    public void shouldAggregateWindowed() throws Exception {
        long firstTimestamp = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        this.produceMessages(firstTimestamp);
        long secondTimestamp = this.mockTime.milliseconds();
        this.produceMessages(secondTimestamp);
        this.produceMessages(secondTimestamp);
        this.groupedStream.windowedBy(TimeWindows.of(500L)).aggregate(this.initializer, this.aggregator, Materialized.with(null, Serdes.Integer())).toStream(new KeyValueMapper<Windowed<String>, Integer, String>(){

            @Override
            public String apply(Windowed<String> windowedKey, Integer value) {
                return windowedKey.key() + "@" + windowedKey.window().start();
            }
        }).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
        this.startStreams();
        List<KeyValue<String, Integer>> windowedMessages = this.receiveMessages(new StringDeserializer(), new IntegerDeserializer(), 15);
        Comparator<KeyValue<String, Integer>> comparator = new Comparator<KeyValue<String, Integer>>(){

            @Override
            public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) {
                return KStreamAggregationIntegrationTest.compare(o1, o2);
            }
        };
        Collections.sort(windowedMessages, comparator);
        long firstWindow = firstTimestamp / 500L * 500L;
        long secondWindow = secondTimestamp / 500L * 500L;
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(Arrays.asList(new KeyValue<String, Integer>("A@" + firstWindow, 1), new KeyValue<String, Integer>("A@" + secondWindow, 1), new KeyValue<String, Integer>("A@" + secondWindow, 2), new KeyValue<String, Integer>("B@" + firstWindow, 1), new KeyValue<String, Integer>("B@" + secondWindow, 1), new KeyValue<String, Integer>("B@" + secondWindow, 2), new KeyValue<String, Integer>("C@" + firstWindow, 1), new KeyValue<String, Integer>("C@" + secondWindow, 1), new KeyValue<String, Integer>("C@" + secondWindow, 2), new KeyValue<String, Integer>("D@" + firstWindow, 1), new KeyValue<String, Integer>("D@" + secondWindow, 1), new KeyValue<String, Integer>("D@" + secondWindow, 2), new KeyValue<String, Integer>("E@" + firstWindow, 1), new KeyValue<String, Integer>("E@" + secondWindow, 1), new KeyValue<String, Integer>("E@" + secondWindow, 2))));
    }

    private void shouldCountHelper() throws Exception {
        this.startStreams();
        this.produceMessages(this.mockTime.milliseconds());
        List<KeyValue<String, Long>> results = this.receiveMessages(new StringDeserializer(), new LongDeserializer(), 10);
        Collections.sort(results, new Comparator<KeyValue<String, Long>>(){

            @Override
            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
                return KStreamAggregationIntegrationTest.compare(o1, o2);
            }
        });
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair("A", 1L), KeyValue.pair("A", 2L), KeyValue.pair("B", 1L), KeyValue.pair("B", 2L), KeyValue.pair("C", 1L), KeyValue.pair("C", 2L), KeyValue.pair("D", 1L), KeyValue.pair("D", 2L), KeyValue.pair("E", 1L), KeyValue.pair("E", 2L))));
    }

    @Test
    public void shouldCount() throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count("count-by-key").to(Serdes.String(), Serdes.Long(), this.outputTopic);
        this.shouldCountHelper();
    }

    @Test
    public void shouldCountWithInternalStore() throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count().to(Serdes.String(), Serdes.Long(), this.outputTopic);
        this.shouldCountHelper();
    }

    @Test
    public void shouldGroupByKey() throws Exception {
        long timestamp = this.mockTime.milliseconds();
        this.produceMessages(timestamp);
        this.produceMessages(timestamp);
        this.stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String())).windowedBy(TimeWindows.of(500L)).count().toStream(new KeyValueMapper<Windowed<Integer>, Long, String>(){

            @Override
            public String apply(Windowed<Integer> windowedKey, Long value) {
                return windowedKey.key() + "@" + windowedKey.window().start();
            }
        }).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
        this.startStreams();
        List<KeyValue<String, Long>> results = this.receiveMessages(new StringDeserializer(), new LongDeserializer(), 10);
        Collections.sort(results, new Comparator<KeyValue<String, Long>>(){

            @Override
            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
                return KStreamAggregationIntegrationTest.compare(o1, o2);
            }
        });
        long window = timestamp / 500L * 500L;
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair("1@" + window, 1L), KeyValue.pair("1@" + window, 2L), KeyValue.pair("2@" + window, 1L), KeyValue.pair("2@" + window, 2L), KeyValue.pair("3@" + window, 1L), KeyValue.pair("3@" + window, 2L), KeyValue.pair("4@" + window, 1L), KeyValue.pair("4@" + window, 2L), KeyValue.pair("5@" + window, 1L), KeyValue.pair("5@" + window, 2L))));
    }

    @Test
    public void shouldCountSessionWindows() throws Exception {
        long sessionGap = 300000L;
        long maintainMillis = 900000L;
        long t1 = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS);
        List t1Messages = Arrays.asList(new KeyValue<String, String>("bob", "start"), new KeyValue<String, String>("penny", "start"), new KeyValue<String, String>("jo", "pause"), new KeyValue<String, String>("emily", "pause"));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, t1Messages, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t1);
        long t2 = t1 + 150000L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue<String, String>("emily", "resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t2);
        long t3 = t1 + 300000L + 1L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue<String, String>("bob", "pause"), new KeyValue<String, String>("penny", "stop")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t3);
        long t4 = t3 + 150000L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue<String, String>("bob", "resume"), new KeyValue<String, String>("jo", "resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t4);
        final HashMap results = new HashMap();
        final CountDownLatch latch = new CountDownLatch(11);
        this.builder.stream(this.userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String())).count(SessionWindows.with(300000L).until(900000L)).toStream().foreach(new ForeachAction<Windowed<String>, Long>(){

            @Override
            public void apply(Windowed<String> key, Long value) {
                results.put(key, value);
                latch.countDown();
            }
        });
        this.startStreams();
        latch.await(30L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(results.get(new Windowed<String>("bob", new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat(results.get(new Windowed<String>("penny", new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat(results.get(new Windowed<String>("jo", new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat(results.get(new Windowed<String>("jo", new SessionWindow(t4, t4))), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat(results.get(new Windowed<String>("emily", new SessionWindow(t1, t2))), (Matcher)CoreMatchers.equalTo((Object)2L));
        MatcherAssert.assertThat(results.get(new Windowed<String>("bob", new SessionWindow(t3, t4))), (Matcher)CoreMatchers.equalTo((Object)2L));
        MatcherAssert.assertThat(results.get(new Windowed<String>("penny", new SessionWindow(t3, t3))), (Matcher)CoreMatchers.equalTo((Object)1L));
    }

    @Test
    public void shouldReduceSessionWindows() throws Exception {
        long sessionGap = 1000L;
        long maintainMillis = 3000L;
        long t1 = this.mockTime.milliseconds();
        List t1Messages = Arrays.asList(new KeyValue<String, String>("bob", "start"), new KeyValue<String, String>("penny", "start"), new KeyValue<String, String>("jo", "pause"), new KeyValue<String, String>("emily", "pause"));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, t1Messages, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t1);
        long t2 = t1 + 500L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue<String, String>("emily", "resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t2);
        long t3 = t1 + 1000L + 1L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue<String, String>("bob", "pause"), new KeyValue<String, String>("penny", "stop")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t3);
        long t4 = t3 + 500L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue<String, String>("bob", "resume"), new KeyValue<String, String>("jo", "resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t4);
        final HashMap results = new HashMap();
        final CountDownLatch latch = new CountDownLatch(11);
        String userSessionsStore = "UserSessionsStore";
        this.builder.stream(this.userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String())).reduce(new Reducer<String>(){

            @Override
            public String apply(String value1, String value2) {
                return value1 + ":" + value2;
            }
        }, SessionWindows.with(1000L).until(3000L), "UserSessionsStore").foreach(new ForeachAction<Windowed<String>, String>(){

            @Override
            public void apply(Windowed<String> key, String value) {
                results.put(key, value);
                latch.countDown();
            }
        });
        this.startStreams();
        latch.await(30L, TimeUnit.SECONDS);
        ReadOnlySessionStore sessionStore = this.kafkaStreams.store("UserSessionsStore", QueryableStoreTypes.sessionStore());
        MatcherAssert.assertThat(results.get(new Windowed<String>("bob", new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)"start"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("penny", new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)"start"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("jo", new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)"pause"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("jo", new SessionWindow(t4, t4))), (Matcher)CoreMatchers.equalTo((Object)"resume"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("emily", new SessionWindow(t1, t2))), (Matcher)CoreMatchers.equalTo((Object)"pause:resume"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("bob", new SessionWindow(t3, t4))), (Matcher)CoreMatchers.equalTo((Object)"pause:resume"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("penny", new SessionWindow(t3, t3))), (Matcher)CoreMatchers.equalTo((Object)"stop"));
        KeyValueIterator bob = sessionStore.fetch("bob");
        MatcherAssert.assertThat(bob.next(), (Matcher)CoreMatchers.equalTo(KeyValue.pair(new Windowed<String>("bob", new SessionWindow(t1, t1)), "start")));
        MatcherAssert.assertThat(bob.next(), (Matcher)CoreMatchers.equalTo(KeyValue.pair(new Windowed<String>("bob", new SessionWindow(t3, t4)), "pause:resume")));
        Assert.assertFalse((boolean)bob.hasNext());
    }

    private void produceMessages(long timestamp) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.streamOneInput, Arrays.asList(new KeyValue<Integer, String>(1, "A"), new KeyValue<Integer, String>(2, "B"), new KeyValue<Integer, String>(3, "C"), new KeyValue<Integer, String>(4, "D"), new KeyValue<Integer, String>(5, "E")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private void createTopics() throws InterruptedException {
        this.streamOneInput = "stream-one-" + testNo;
        this.outputTopic = "output-" + testNo;
        this.userSessionsStream = this.userSessionsStream + "-" + testNo;
        CLUSTER.createTopic(this.streamOneInput, 3, 1);
        CLUSTER.createTopics(this.userSessionsStream, this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> List<KeyValue<K, V>> receiveMessages(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int numMessages) throws InterruptedException {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "kgroupedstream-test-" + testNo);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, this.outputTopic, numMessages, 60000L);
    }
}

