/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class StreamsBuilderTest {
    private final StreamsBuilder builder = new StreamsBuilder();
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Test(expected=TopologyException.class)
    public void testFrom() {
        this.builder.stream(Arrays.asList("topic-1", "topic-2"));
        this.builder.build().addSource("KSTREAM-SOURCE-0000000000", "topic-3");
    }

    @Test
    public void shouldAllowJoinUnmaterializedFilteredKTable() {
        KTable filteredKTable = this.builder.table("table-topic").filter(MockPredicate.allGoodPredicate());
        this.builder.stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology topology = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton(topology.stateStores().get(0).name())));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldAllowJoinMaterializedFilteredKTable() {
        KTable filteredKTable = this.builder.table("table-topic").filter(MockPredicate.allGoodPredicate(), Materialized.as("store"));
        this.builder.stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology topology = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedMapValuedKTable() {
        KTable mappedKTable = this.builder.table("table-topic").mapValues(MockMapper.noOpValueMapper());
        this.builder.stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology topology = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton(topology.stateStores().get(0).name())));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldAllowJoinMaterializedMapValuedKTable() {
        KTable mappedKTable = this.builder.table("table-topic").mapValues(MockMapper.noOpValueMapper(), Materialized.as("store"));
        this.builder.stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology topology = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedJoinedKTable() {
        KTable table1 = this.builder.table("table-topic1");
        KTable table2 = this.builder.table("table-topic2");
        this.builder.stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology topology = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), (Matcher)CoreMatchers.equalTo(Utils.mkSet(topology.stateStores().get(0).name(), topology.stateStores().get(1).name())));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldAllowJoinMaterializedJoinedKTable() {
        KTable table1 = this.builder.table("table-topic1");
        KTable table2 = this.builder.table("table-topic2");
        this.builder.stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology topology = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)3));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinMaterializedSourceKTable() {
        KTable table = this.builder.table("table-topic");
        this.builder.stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        ProcessorTopology topology = this.builder.internalTopologyBuilder.build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), (Matcher)CoreMatchers.equalTo(Collections.singleton(topology.stateStores().get(0).name())));
        MatcherAssert.assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"), (Matcher)CoreMatchers.equalTo(Collections.singleton(topology.stateStores().get(0).name())));
    }

    @Test
    public void shouldProcessingFromSinkTopic() {
        KStream source = this.builder.stream("topic-source");
        source.to("topic-sink");
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        source.process(processorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-source", "A", "aa");
        Assert.assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
    }

    @Test
    public void shouldProcessViaThroughTopic() {
        KStream source = this.builder.stream("topic-source");
        KStream through = source.through("topic-sink");
        MockProcessorSupplier sourceProcessorSupplier = new MockProcessorSupplier();
        MockProcessorSupplier throughProcessorSupplier = new MockProcessorSupplier();
        source.process(sourceProcessorSupplier, new String[0]);
        through.process(throughProcessorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-source", "A", "aa");
        Assert.assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
        Assert.assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
    }

    @Test
    public void testMerge() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        KStream merged = source1.merge(source2);
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        merged.process(processorSupplier, new String[0]);
        this.driver.setUp(this.builder);
        this.driver.setTime(0L);
        this.driver.process("topic-1", "A", "aa");
        this.driver.process("topic-2", "B", "bb");
        this.driver.process("topic-2", "C", "cc");
        this.driver.process("topic-1", "D", "dd");
        Assert.assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
        final HashMap results = new HashMap();
        String topic = "topic";
        ForeachAction<Long, String> action = new ForeachAction<Long, String>(){

            @Override
            public void apply(Long key, String value) {
                results.put(key, value);
            }
        };
        this.builder.table("topic", Materialized.as("store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String())).toStream().foreach(action);
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(0L);
        this.driver.process("topic", 1L, "value1");
        this.driver.process("topic", 2L, "value2");
        this.driver.flushState();
        KeyValueStore store = (KeyValueStore)this.driver.allStateStores().get("store");
        MatcherAssert.assertThat(store.get(1L), (Matcher)CoreMatchers.equalTo((Object)"value1"));
        MatcherAssert.assertThat(store.get(2L), (Matcher)CoreMatchers.equalTo((Object)"value2"));
        MatcherAssert.assertThat(results.get(1L), (Matcher)CoreMatchers.equalTo((Object)"value1"));
        MatcherAssert.assertThat(results.get(2L), (Matcher)CoreMatchers.equalTo((Object)"value2"));
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() {
        String topic = "topic";
        this.builder.globalTable("topic", Materialized.as("store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()));
        this.driver.setUp(this.builder, TestUtils.tempDirectory());
        this.driver.setTime(0L);
        this.driver.process("topic", 1L, "value1");
        this.driver.process("topic", 2L, "value2");
        this.driver.flushState();
        KeyValueStore store = (KeyValueStore)this.driver.allStateStores().get("store");
        MatcherAssert.assertThat(store.get(1L), (Matcher)CoreMatchers.equalTo((Object)"value1"));
        MatcherAssert.assertThat(store.get(2L), (Matcher)CoreMatchers.equalTo((Object)"value2"));
    }

    @Test
    public void shouldUseDefaultNodeAndStoreNames() {
        String topic = "topic";
        this.builder.table("topic", Materialized.with(Serdes.Long(), Serdes.String()));
        Iterator<TopologyDescription.Subtopology> subtopologies = this.builder.build().describe().subtopologies().iterator();
        TopologyDescription.Subtopology subtopology = subtopologies.next();
        Iterator<TopologyDescription.Node> nodes = subtopology.nodes().iterator();
        TopologyDescription.Node node = nodes.next();
        MatcherAssert.assertThat((Object)node.name(), (Matcher)CoreMatchers.equalTo((Object)"KSTREAM-SOURCE-0000000001"));
        node = nodes.next();
        MatcherAssert.assertThat((Object)node.name(), (Matcher)CoreMatchers.equalTo((Object)"KTABLE-SOURCE-0000000002"));
        Iterator<String> stores = ((TopologyDescription.Processor)node).stores().iterator();
        MatcherAssert.assertThat((Object)stores.next(), (Matcher)CoreMatchers.equalTo((Object)"topic-STATE-STORE-0000000000"));
        Assert.assertFalse((boolean)nodes.hasNext());
        Assert.assertFalse((boolean)stores.hasNext());
        Assert.assertFalse((boolean)subtopologies.hasNext());
    }

    @Test(expected=TopologyException.class)
    public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
        this.builder.stream(Collections.emptyList());
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
        this.builder.stream(Arrays.asList(null, null));
    }

    public static InternalTopologyBuilder internalTopologyBuilder(StreamsBuilder builder) {
        return builder.internalTopologyBuilder;
    }

    public static Collection<Set<String>> getCopartitionedGroups(StreamsBuilder builder) {
        return builder.internalTopologyBuilder.copartitionGroups();
    }
}

