/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RepartitionTopicConfig;
import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.WindowedChangelogTopicConfig;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Test;

public class InternalTopologyBuilderTest {
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final Serde<String> stringSerde = Serdes.String();

    @Test
    public void shouldAddSourceWithOffsetReset() {
        String earliestTopic = "earliestTopic";
        String latestTopic = "latestTopic";
        this.builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, null, null, "earliestTopic");
        this.builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null, "latestTopic");
        Assert.assertTrue((boolean)this.builder.earliestResetTopicsPattern().matcher("earliestTopic").matches());
        Assert.assertTrue((boolean)this.builder.latestResetTopicsPattern().matcher("latestTopic").matches());
    }

    @Test
    public void shouldAddSourcePatternWithOffsetReset() {
        String earliestTopicPattern = "earliest.*Topic";
        String latestTopicPattern = "latest.*Topic";
        this.builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, null, null, Pattern.compile("earliest.*Topic"));
        this.builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null, Pattern.compile("latest.*Topic"));
        Assert.assertTrue((boolean)this.builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
        Assert.assertTrue((boolean)this.builder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
    }

    @Test
    public void shouldAddSourceWithoutOffsetReset() {
        Pattern expectedPattern = Pattern.compile("test-topic");
        this.builder.addSource(null, "source", null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), "test-topic");
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)this.builder.sourceTopicPattern().pattern());
        Assert.assertEquals((Object)this.builder.earliestResetTopicsPattern().pattern(), (Object)"");
        Assert.assertEquals((Object)this.builder.latestResetTopicsPattern().pattern(), (Object)"");
    }

    @Test
    public void shouldAddPatternSourceWithoutOffsetReset() {
        Pattern expectedPattern = Pattern.compile("test-.*");
        this.builder.addSource(null, "source", null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), Pattern.compile("test-.*"));
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)this.builder.sourceTopicPattern().pattern());
        Assert.assertEquals((Object)this.builder.earliestResetTopicsPattern().pattern(), (Object)"");
        Assert.assertEquals((Object)this.builder.latestResetTopicsPattern().pattern(), (Object)"");
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAllowOffsetResetSourceWithoutTopics() {
        this.builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), new String[0]);
    }

    @Test
    public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
        this.builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), "topic-1");
        try {
            this.builder.addSource(Topology.AutoOffsetReset.LATEST, "source", null, this.stringSerde.deserializer(), this.stringSerde.deserializer(), "topic-2");
            Assert.fail((String)"Should throw TopologyException for duplicate source name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testAddSourceWithSameName() {
        this.builder.addSource(null, "source", null, null, null, "topic-1");
        try {
            this.builder.addSource(null, "source", null, null, null, "topic-2");
            Assert.fail((String)"Should throw TopologyException with source name conflict");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testAddSourceWithSameTopic() {
        this.builder.addSource(null, "source", null, null, null, "topic-1");
        try {
            this.builder.addSource(null, "source-2", null, null, null, "topic-1");
            Assert.fail((String)"Should throw TopologyException with topic conflict");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testAddProcessorWithSameName() {
        this.builder.addSource(null, "source", null, null, null, "topic-1");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        try {
            this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
            Assert.fail((String)"Should throw TopologyException with processor name conflict");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test(expected=TopologyException.class)
    public void testAddProcessorWithWrongParent() {
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
    }

    @Test(expected=TopologyException.class)
    public void testAddProcessorWithSelfParent() {
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
    }

    @Test
    public void testAddSinkWithSameName() {
        this.builder.addSource(null, "source", null, null, null, "topic-1");
        this.builder.addSink("sink", "topic-2", null, null, null, "source");
        try {
            this.builder.addSink("sink", "topic-3", null, null, null, "source");
            Assert.fail((String)"Should throw TopologyException with sink name conflict");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test(expected=TopologyException.class)
    public void testAddSinkWithWrongParent() {
        this.builder.addSink("sink", "topic-2", null, null, null, "source");
    }

    @Test(expected=TopologyException.class)
    public void testAddSinkWithSelfParent() {
        this.builder.addSink("sink", "topic-2", null, null, null, "sink");
    }

    @Test
    public void testAddSinkConnectedWithParent() {
        this.builder.addSource(null, "source", null, null, null, "source-topic");
        this.builder.addSink("sink", "dest-topic", null, null, null, "source");
        Map<Integer, Set<String>> nodeGroups = this.builder.nodeGroups();
        Set<String> nodeGroup = nodeGroups.get(0);
        Assert.assertTrue((boolean)nodeGroup.contains("sink"));
        Assert.assertTrue((boolean)nodeGroup.contains("source"));
    }

    @Test
    public void testAddSinkConnectedWithMultipleParent() {
        this.builder.addSource(null, "source", null, null, null, "source-topic");
        this.builder.addSource(null, "sourceII", null, null, null, "source-topicII");
        this.builder.addSink("sink", "dest-topic", null, null, null, "source", "sourceII");
        Map<Integer, Set<String>> nodeGroups = this.builder.nodeGroups();
        Set<String> nodeGroup = nodeGroups.get(0);
        Assert.assertTrue((boolean)nodeGroup.contains("sink"));
        Assert.assertTrue((boolean)nodeGroup.contains("source"));
        Assert.assertTrue((boolean)nodeGroup.contains("sourceII"));
    }

    @Test
    public void testSourceTopics() {
        this.builder.setApplicationId("X");
        this.builder.addSource(null, "source-1", null, null, null, "topic-1");
        this.builder.addSource(null, "source-2", null, null, null, "topic-2");
        this.builder.addSource(null, "source-3", null, null, null, "topic-3");
        this.builder.addInternalTopic("topic-3");
        Pattern expectedPattern = Pattern.compile("X-topic-3|topic-1|topic-2");
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)this.builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testPatternSourceTopic() {
        Pattern expectedPattern = Pattern.compile("topic-\\d");
        this.builder.addSource(null, "source-1", null, null, null, expectedPattern);
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)this.builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testAddMoreThanOnePatternSourceNode() {
        Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d");
        this.builder.addSource(null, "source-1", null, null, null, Pattern.compile("topics[A-Z]"));
        this.builder.addSource(null, "source-2", null, null, null, Pattern.compile(".*-\\d"));
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)this.builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testSubscribeTopicNameAndPattern() {
        Pattern expectedPattern = Pattern.compile("topic-bar|topic-foo|.*-\\d");
        this.builder.addSource(null, "source-1", null, null, null, "topic-foo", "topic-bar");
        this.builder.addSource(null, "source-2", null, null, null, Pattern.compile(".*-\\d"));
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)this.builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testPatternMatchesAlreadyProvidedTopicSource() {
        this.builder.addSource(null, "source-1", null, null, null, "foo");
        try {
            this.builder.addSource(null, "source-2", null, null, null, Pattern.compile("f.*"));
            Assert.fail((String)"Should throw TopologyException with topic name/pattern conflict");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testNamedTopicMatchesAlreadyProvidedPattern() {
        this.builder.addSource(null, "source-1", null, null, null, Pattern.compile("f.*"));
        try {
            this.builder.addSource(null, "source-2", null, null, null, "foo");
            Assert.fail((String)"Should throw TopologyException with topic name/pattern conflict");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test(expected=TopologyException.class)
    public void testAddStateStoreWithNonExistingProcessor() {
        this.builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
    }

    @Test
    public void testAddStateStoreWithSource() {
        this.builder.addSource(null, "source-1", null, null, null, "topic-1");
        try {
            this.builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
            Assert.fail((String)"Should throw TopologyException with store cannot be added to source");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testAddStateStoreWithSink() {
        this.builder.addSink("sink-1", "topic-1", null, null, null, new String[0]);
        try {
            this.builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
            Assert.fail((String)"Should throw TopologyException with store cannot be added to sink");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testAddStateStoreWithDuplicates() {
        this.builder.addStateStore(new MockStateStoreSupplier("store", false), new String[0]);
        try {
            this.builder.addStateStore(new MockStateStoreSupplier("store", false), new String[0]);
            Assert.fail((String)"Should throw TopologyException with store name conflict");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testAddStateStore() {
        MockStateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
        this.builder.addStateStore(supplier, new String[0]);
        this.builder.setApplicationId("X");
        this.builder.addSource(null, "source-1", null, null, null, "topic-1");
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
        Assert.assertEquals((long)0L, (long)this.builder.build((Integer)null).stateStores().size());
        this.builder.connectProcessorAndStateStores("processor-1", "store-1");
        List<StateStore> suppliers = this.builder.build((Integer)null).stateStores();
        Assert.assertEquals((long)1L, (long)suppliers.size());
        Assert.assertEquals((Object)supplier.name(), (Object)suppliers.get(0).name());
    }

    @Test
    public void testTopicGroups() {
        this.builder.setApplicationId("X");
        this.builder.addInternalTopic("topic-1x");
        this.builder.addSource(null, "source-1", null, null, null, "topic-1", "topic-1x");
        this.builder.addSource(null, "source-2", null, null, null, "topic-2");
        this.builder.addSource(null, "source-3", null, null, null, "topic-3");
        this.builder.addSource(null, "source-4", null, null, null, "topic-4");
        this.builder.addSource(null, "source-5", null, null, null, "topic-5");
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
        this.builder.copartitionSources(Utils.mkList("source-1", "source-2"));
        this.builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.builder.topicGroups();
        HashMap<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<Integer, InternalTopologyBuilder.TopicsInfo>();
        expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap()));
        expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap()));
        expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap()));
        Assert.assertEquals((long)3L, (long)topicGroups.size());
        Assert.assertEquals(expectedTopicGroups, topicGroups);
        Collection<Set<String>> copartitionGroups = this.builder.copartitionGroups();
        Assert.assertEquals(Utils.mkSet(Utils.mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<Set<String>>(copartitionGroups));
    }

    @Test
    public void testTopicGroupsByStateStore() {
        this.builder.setApplicationId("X");
        this.builder.addSource(null, "source-1", null, null, null, "topic-1", "topic-1x");
        this.builder.addSource(null, "source-2", null, null, null, "topic-2");
        this.builder.addSource(null, "source-3", null, null, null, "topic-3");
        this.builder.addSource(null, "source-4", null, null, null, "topic-4");
        this.builder.addSource(null, "source-5", null, null, null, "topic-5");
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
        this.builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2");
        this.builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
        this.builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
        this.builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4");
        this.builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
        MockStateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false);
        this.builder.addStateStore(supplier, new String[0]);
        this.builder.connectProcessorAndStateStores("processor-5", "store-3");
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.builder.topicGroups();
        HashMap<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<Integer, InternalTopologyBuilder.TopicsInfo>();
        String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
        String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
        String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
        expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-1", "topic-1x", "topic-2"), Collections.emptyMap(), Collections.singletonMap(store1, new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap()))));
        expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.singletonMap(store2, new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap()))));
        expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-5"), Collections.emptyMap(), Collections.singletonMap(store3, new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap()))));
        Assert.assertEquals((long)3L, (long)topicGroups.size());
        Assert.assertEquals(expectedTopicGroups, topicGroups);
    }

    @Test
    public void testBuild() {
        this.builder.addSource(null, "source-1", null, null, null, "topic-1", "topic-1x");
        this.builder.addSource(null, "source-2", null, null, null, "topic-2");
        this.builder.addSource(null, "source-3", null, null, null, "topic-3");
        this.builder.addSource(null, "source-4", null, null, null, "topic-4");
        this.builder.addSource(null, "source-5", null, null, null, "topic-5");
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
        this.builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
        this.builder.setApplicationId("X");
        ProcessorTopology topology0 = this.builder.build(0);
        ProcessorTopology topology1 = this.builder.build(1);
        ProcessorTopology topology2 = this.builder.build(2);
        Assert.assertEquals(Utils.mkSet("source-1", "source-2", "processor-1", "processor-2"), this.nodeNames(topology0.processors()));
        Assert.assertEquals(Utils.mkSet("source-3", "source-4", "processor-3"), this.nodeNames(topology1.processors()));
        Assert.assertEquals(Utils.mkSet("source-5"), this.nodeNames(topology2.processors()));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
        this.builder.addSink(null, "topic", null, null, null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
        this.builder.addSink("name", null, null, null, null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
        this.builder.addProcessor(null, new ProcessorSupplier(){

            public Processor get() {
                return null;
            }
        }, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessorSupplier() throws Exception {
        this.builder.addProcessor("name", null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
        this.builder.addSource(null, null, null, null, null, Pattern.compile(".*"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception {
        this.builder.connectProcessorAndStateStores(null, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() throws Exception {
        this.builder.connectProcessorAndStateStores("processor", new String[]{null});
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAddNullInternalTopic() throws Exception {
        this.builder.addInternalTopic(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotSetApplicationIdToNull() throws Exception {
        this.builder.setApplicationId(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAddNullStateStoreSupplier() throws Exception {
        this.builder.addStateStore((StateStoreSupplier)null, new String[0]);
    }

    private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
        HashSet<String> nodeNames = new HashSet<String>();
        for (ProcessorNode node : nodes) {
            nodeNames.add(node.name());
        }
        return nodeNames;
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception {
        this.builder.addSource(null, "source", null, null, null, "topic");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        this.builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
        Map<String, List<String>> stateStoreNameToSourceTopic = this.builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((long)1L, (long)stateStoreNameToSourceTopic.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception {
        this.builder.addSource(null, "source", null, null, null, "topic");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        this.builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
        Map<String, List<String>> stateStoreNameToSourceTopic = this.builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((long)1L, (long)stateStoreNameToSourceTopic.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
    }

    @Test
    public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception {
        this.builder.setApplicationId("appId");
        this.builder.addInternalTopic("internal-topic");
        this.builder.addSource(null, "source", null, null, null, "internal-topic");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        this.builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
        Map<String, List<String>> stateStoreNameToSourceTopic = this.builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((long)1L, (long)stateStoreNameToSourceTopic.size());
        Assert.assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
    }

    @Test
    public void shouldAddInternalTopicConfigForWindowStores() throws Exception {
        this.builder.setApplicationId("appId");
        this.builder.addSource(null, "source", null, null, null, "topic");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        this.builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000L, 3, false, null, null, 10000L, true, Collections.emptyMap(), false), "processor");
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.builder.topicGroups();
        InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
        InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
        Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals((long)2L, (long)properties.size());
        Assert.assertEquals((Object)"compact,delete", (Object)properties.get("cleanup.policy"));
        Assert.assertEquals((Object)"40000", (Object)properties.get("retention.ms"));
        Assert.assertEquals((Object)"appId-store-changelog", (Object)topicConfig.name());
        Assert.assertTrue((boolean)(topicConfig instanceof WindowedChangelogTopicConfig));
    }

    @Test
    public void shouldAddInternalTopicConfigForNonWindowStores() throws Exception {
        this.builder.setApplicationId("appId");
        this.builder.addSource(null, "source", null, null, null, "topic");
        this.builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        this.builder.addStateStore(new MockStateStoreSupplier("store", true), "processor");
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.builder.topicGroups();
        InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
        InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
        Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals((long)1L, (long)properties.size());
        Assert.assertEquals((Object)"compact", (Object)properties.get("cleanup.policy"));
        Assert.assertEquals((Object)"appId-store-changelog", (Object)topicConfig.name());
        Assert.assertTrue((boolean)(topicConfig instanceof UnwindowedChangelogTopicConfig));
    }

    @Test
    public void shouldAddInternalTopicConfigForRepartitionTopics() throws Exception {
        this.builder.setApplicationId("appId");
        this.builder.addInternalTopic("foo");
        this.builder.addSource(null, "source", null, null, null, "foo");
        InternalTopologyBuilder.TopicsInfo topicsInfo = this.builder.topicGroups().values().iterator().next();
        InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
        Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals((long)4L, (long)properties.size());
        Assert.assertEquals((Object)"delete", (Object)properties.get("cleanup.policy"));
        Assert.assertEquals((Object)"appId-foo", (Object)topicConfig.name());
        Assert.assertTrue((boolean)(topicConfig instanceof RepartitionTopicConfig));
    }

    @Test
    public void shouldThrowOnUnassignedStateStoreAccess() {
        block2: {
            String sourceNodeName = "source";
            String goodNodeName = "goodGuy";
            String badNodeName = "badGuy";
            Properties config = new Properties();
            config.put("bootstrap.servers", "host:1");
            config.put("application.id", "appId");
            config.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
            StreamsConfig streamsConfig = new StreamsConfig(config);
            this.builder.addSource(null, "source", null, null, null, "topic");
            this.builder.addProcessor("goodGuy", new LocalMockProcessorSupplier(), "source");
            this.builder.addStateStore(Stores.create("store").withStringKeys().withStringValues().inMemory().build(), "goodGuy");
            this.builder.addProcessor("badGuy", new LocalMockProcessorSupplier(), "source");
            try {
                new ProcessorTopologyTestDriver(streamsConfig, this.builder);
                Assert.fail((String)"Should have throw StreamsException");
            }
            catch (StreamsException expected) {
                Throwable cause = expected.getCause();
                if (cause != null && cause instanceof TopologyBuilderException && cause.getMessage().equals("Invalid topology building: Processor badGuy has no access to StateStore store")) break block2;
                throw new RuntimeException("Did expect different exception. Did catch:", expected);
            }
        }
    }

    @Test
    public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
        this.builder.addSource(null, "source-1", null, null, null, "topic-foo");
        this.builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-[A-C]"));
        this.builder.addSource(null, "source-3", null, null, null, Pattern.compile("topic-\\d"));
        InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates();
        Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        updatedTopicsField.setAccessible(true);
        Set updatedTopics = (Set)updatedTopicsField.get(subscriptionUpdates);
        updatedTopics.add("topic-B");
        updatedTopics.add("topic-3");
        updatedTopics.add("topic-A");
        this.builder.updateSubscriptions(subscriptionUpdates, null);
        this.builder.setApplicationId("test-id");
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.builder.topicGroups();
        Assert.assertTrue((boolean)topicGroups.get((Object)Integer.valueOf((int)0)).sourceTopics.contains("topic-foo"));
        Assert.assertTrue((boolean)topicGroups.get((Object)Integer.valueOf((int)1)).sourceTopics.contains("topic-A"));
        Assert.assertTrue((boolean)topicGroups.get((Object)Integer.valueOf((int)1)).sourceTopics.contains("topic-B"));
        Assert.assertTrue((boolean)topicGroups.get((Object)Integer.valueOf((int)2)).sourceTopics.contains("topic-3"));
    }

    @Test
    public void shouldAddTimestampExtractorPerSource() throws Exception {
        this.builder.addSource(null, "source", (TimestampExtractor)new MockTimestampExtractor(), null, null, "topic");
        ProcessorTopology processorTopology = this.builder.build((Integer)null);
        Assert.assertThat((Object)processorTopology.source("topic").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception {
        Pattern pattern = Pattern.compile("t.*");
        this.builder.addSource(null, "source", (TimestampExtractor)new MockTimestampExtractor(), null, null, pattern);
        ProcessorTopology processorTopology = this.builder.build((Integer)null);
        Assert.assertThat((Object)processorTopology.source(pattern.pattern()).getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldSortProcessorNodesCorrectly() throws Exception {
        this.builder.addSource(null, "source1", null, null, null, "topic1");
        this.builder.addSource(null, "source2", null, null, null, "topic2");
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), "source1", "source2");
        this.builder.addProcessor("processor3", new MockProcessorSupplier(), "processor2");
        this.builder.addSink("sink1", "topic2", null, null, null, "processor1", "processor3");
        Assert.assertEquals((long)1L, (long)this.builder.describe().subtopologies().size());
        Iterator<TopologyDescription.Node> iterator = ((InternalTopologyBuilder.Subtopology)this.builder.describe().subtopologies().iterator().next()).nodesInOrder();
        Assert.assertTrue((boolean)iterator.hasNext());
        InternalTopologyBuilder.AbstractNode node = (InternalTopologyBuilder.AbstractNode)iterator.next();
        Assert.assertTrue((boolean)node.name.equals("source1"));
        Assert.assertEquals((long)6L, (long)node.size);
        Assert.assertTrue((boolean)iterator.hasNext());
        node = (InternalTopologyBuilder.AbstractNode)iterator.next();
        Assert.assertTrue((boolean)node.name.equals("source2"));
        Assert.assertEquals((long)4L, (long)node.size);
        Assert.assertTrue((boolean)iterator.hasNext());
        node = (InternalTopologyBuilder.AbstractNode)iterator.next();
        Assert.assertTrue((boolean)node.name.equals("processor2"));
        Assert.assertEquals((long)3L, (long)node.size);
        Assert.assertTrue((boolean)iterator.hasNext());
        node = (InternalTopologyBuilder.AbstractNode)iterator.next();
        Assert.assertTrue((boolean)node.name.equals("processor1"));
        Assert.assertEquals((long)2L, (long)node.size);
        Assert.assertTrue((boolean)iterator.hasNext());
        node = (InternalTopologyBuilder.AbstractNode)iterator.next();
        Assert.assertTrue((boolean)node.name.equals("processor3"));
        Assert.assertEquals((long)2L, (long)node.size);
        Assert.assertTrue((boolean)iterator.hasNext());
        node = (InternalTopologyBuilder.AbstractNode)iterator.next();
        Assert.assertTrue((boolean)node.name.equals("sink1"));
        Assert.assertEquals((long)1L, (long)node.size);
    }

    @Test
    public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
        this.builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+"));
        this.builder.addProcessor("my-processor", new MockProcessorSupplier(), "ingest");
        this.builder.addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
        InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates();
        Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        updatedTopicsField.setAccessible(true);
        Set updatedTopics = (Set)updatedTopicsField.get(subscriptionUpdates);
        updatedTopics.add("topic-2");
        updatedTopics.add("topic-3");
        updatedTopics.add("topic-A");
        this.builder.updateSubscriptions(subscriptionUpdates, "test-thread");
        this.builder.setApplicationId("test-app");
        Map<String, List<String>> stateStoreAndTopics = this.builder.stateStoreNameToSourceTopics();
        List<String> topics = stateStoreAndTopics.get("testStateStore");
        Assert.assertTrue((String)"Expected to contain two topics", (topics.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)topics.contains("topic-2"));
        Assert.assertTrue((boolean)topics.contains("topic-3"));
        Assert.assertFalse((boolean)topics.contains("topic-A"));
    }

    @Test(expected=TopologyException.class)
    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
        String sameNameForSourceAndProcessor = "sameName";
        this.builder.addGlobalStore(new MockStateStoreSupplier("anyName", false, false), "sameName", null, null, null, "anyTopicName", "sameName", new MockProcessorSupplier());
    }

    private static class LocalMockProcessorSupplier
    implements ProcessorSupplier {
        static final String STORE_NAME = "store";

        private LocalMockProcessorSupplier() {
        }

        public Processor get() {
            return new Processor(){

                @Override
                public void init(ProcessorContext context) {
                    context.getStateStore(LocalMockProcessorSupplier.STORE_NAME);
                }

                public void process(Object key, Object value) {
                }

                @Override
                public void punctuate(long timestamp) {
                }

                @Override
                public void close() {
                }
            };
        }
    }
}

