/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class TimeWindowedKStreamImplTest {
    private static final String TOPIC = "input";
    private final StreamsBuilder builder = new StreamsBuilder();
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();
    private TimeWindowedKStream<String, String> windowedStream;

    @Before
    public void before() {
        KStream<String, String> stream = this.builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
        this.windowedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.of(500L));
    }

    @Test
    public void shouldCountWindowed() {
        final HashMap results = new HashMap();
        this.windowedStream.count().toStream().foreach(new ForeachAction<Windowed<String>, Long>(){

            @Override
            public void apply(Windowed<String> key, Long value) {
                results.put(key, value);
            }
        });
        this.processData();
        MatcherAssert.assertThat(results.get(new Windowed<String>("1", new TimeWindow(0L, 500L))), (Matcher)CoreMatchers.equalTo((Object)2L));
        MatcherAssert.assertThat(results.get(new Windowed<String>("2", new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)1L));
        MatcherAssert.assertThat(results.get(new Windowed<String>("1", new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)1L));
    }

    @Test
    public void shouldReduceWindowed() {
        final HashMap results = new HashMap();
        this.windowedStream.reduce(MockReducer.STRING_ADDER).toStream().foreach(new ForeachAction<Windowed<String>, String>(){

            @Override
            public void apply(Windowed<String> key, String value) {
                results.put(key, value);
            }
        });
        this.processData();
        MatcherAssert.assertThat(results.get(new Windowed<String>("1", new TimeWindow(0L, 500L))), (Matcher)CoreMatchers.equalTo((Object)"1+2"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("2", new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)"1"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("1", new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)"3"));
    }

    @Test
    public void shouldAggregateWindowed() {
        final HashMap results = new HashMap();
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).toStream().foreach((ForeachAction<Windowed<Object>, String>)new ForeachAction<Windowed<String>, String>(){

            @Override
            public void apply(Windowed<String> key, String value) {
                results.put(key, value);
            }
        });
        this.processData();
        MatcherAssert.assertThat(results.get(new Windowed<String>("1", new TimeWindow(0L, 500L))), (Matcher)CoreMatchers.equalTo((Object)"0+1+2"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("2", new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)"0+1"));
        MatcherAssert.assertThat(results.get(new Windowed<String>("1", new TimeWindow(500L, 1000L))), (Matcher)CoreMatchers.equalTo((Object)"0+3"));
    }

    @Test
    public void shouldMaterializeCount() {
        this.windowedStream.count(Materialized.as("count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));
        this.processData();
        WindowStore windowStore = (WindowStore)this.driver.allStateStores().get("count-store");
        List data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0L, 1000L));
        MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed<String>("1", new TimeWindow(0L, 500L)), 2L), KeyValue.pair(new Windowed<String>("1", new TimeWindow(500L, 1000L)), 1L), KeyValue.pair(new Windowed<String>("2", new TimeWindow(500L, 1000L)), 1L))));
    }

    @Test
    public void shouldMaterializeReduced() {
        this.windowedStream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduced").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        this.processData();
        WindowStore windowStore = (WindowStore)this.driver.allStateStores().get("reduced");
        List data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0L, 1000L));
        MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed<String>("1", new TimeWindow(0L, 500L)), "1+2"), KeyValue.pair(new Windowed<String>("1", new TimeWindow(500L, 1000L)), "3"), KeyValue.pair(new Windowed<String>("2", new TimeWindow(500L, 1000L)), "1"))));
    }

    @Test
    public void shouldMaterializeAggregated() {
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("aggregated").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        this.processData();
        WindowStore windowStore = (WindowStore)this.driver.allStateStores().get("aggregated");
        List data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0L, 1000L));
        MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed<String>("1", new TimeWindow(0L, 500L)), "0+1+2"), KeyValue.pair(new Windowed<String>("1", new TimeWindow(500L, 1000L)), "0+3"), KeyValue.pair(new Windowed<String>("2", new TimeWindow(500L, 1000L)), "0+1"))));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
        this.windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
        this.windowedStream.reduce(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
        this.windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
        this.windowedStream.reduce(null, Materialized.as("store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
        this.windowedStream.reduce(MockReducer.STRING_ADDER, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
        this.windowedStream.count(null);
    }

    private void processData() {
        this.driver.setUp(this.builder, TestUtils.tempDirectory(), 0L);
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "1", "2");
        this.driver.setTime(500L);
        this.driver.process(TOPIC, "1", "3");
        this.driver.process(TOPIC, "2", "1");
        this.driver.flushState();
    }
}

