/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor;

import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Test;

public class TopologyBuilderTest {
    @Test
    public void shouldAddSourceWithOffsetReset() {
        TopologyBuilder builder = new TopologyBuilder();
        String earliestTopic = "earliestTopic";
        String latestTopic = "latestTopic";
        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", "earliestTopic");
        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", "latestTopic");
        Assert.assertTrue((boolean)builder.earliestResetTopicsPattern().matcher("earliestTopic").matches());
        Assert.assertTrue((boolean)builder.latestResetTopicsPattern().matcher("latestTopic").matches());
    }

    @Test
    public void shouldAddSourcePatternWithOffsetReset() {
        TopologyBuilder builder = new TopologyBuilder();
        String earliestTopicPattern = "earliest.*Topic";
        String latestTopicPattern = "latest.*Topic";
        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", Pattern.compile("earliest.*Topic"));
        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", Pattern.compile("latest.*Topic"));
        Assert.assertTrue((boolean)builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
        Assert.assertTrue((boolean)builder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
    }

    @Test
    public void shouldAddSourceWithoutOffsetReset() {
        TopologyBuilder builder = new TopologyBuilder();
        Serde<String> stringSerde = Serdes.String();
        Pattern expectedPattern = Pattern.compile("test-topic");
        builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(), "test-topic");
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
        Assert.assertEquals((Object)builder.earliestResetTopicsPattern().pattern(), (Object)"");
        Assert.assertEquals((Object)builder.latestResetTopicsPattern().pattern(), (Object)"");
    }

    @Test
    public void shouldAddPatternSourceWithoutOffsetReset() {
        TopologyBuilder builder = new TopologyBuilder();
        Serde<String> stringSerde = Serdes.String();
        Pattern expectedPattern = Pattern.compile("test-.*");
        builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(), Pattern.compile("test-.*"));
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
        Assert.assertEquals((Object)builder.earliestResetTopicsPattern().pattern(), (Object)"");
        Assert.assertEquals((Object)builder.latestResetTopicsPattern().pattern(), (Object)"");
    }

    @Test
    public void shouldNotAllowOffsetResetSourceWithoutTopics() {
        TopologyBuilder builder = new TopologyBuilder();
        Serde<String> stringSerde = Serdes.String();
        try {
            builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), new String[0]);
            Assert.fail((String)"Should throw TopologyBuilderException with no topics");
        }
        catch (TopologyBuilderException topologyBuilderException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
        TopologyBuilder builder = new TopologyBuilder();
        Serde<String> stringSerde = Serdes.String();
        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
        try {
            builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
            Assert.fail((String)"Should throw TopologyBuilderException for duplicate source name");
        }
        catch (TopologyBuilderException topologyBuilderException) {
            // empty catch block
        }
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddSourceWithSameName() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", "topic-1");
        builder.addSource("source", "topic-2");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddSourceWithSameTopic() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", "topic-1");
        builder.addSource("source-2", "topic-1");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddProcessorWithSameName() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", "topic-1");
        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddProcessorWithWrongParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddProcessorWithSelfParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddSinkWithSameName() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", "topic-1");
        builder.addSink("sink", "topic-2", "source");
        builder.addSink("sink", "topic-3", "source");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddSinkWithWrongParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink("sink", "topic-2", "source");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddSinkWithSelfParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink("sink", "topic-2", "sink");
    }

    @Test
    public void testAddSinkConnectedWithParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", "source-topic");
        builder.addSink("sink", "dest-topic", "source");
        Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
        Set<String> nodeGroup = nodeGroups.get(0);
        Assert.assertTrue((boolean)nodeGroup.contains("sink"));
        Assert.assertTrue((boolean)nodeGroup.contains("source"));
    }

    @Test
    public void testAddSinkConnectedWithMultipleParent() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", "source-topic");
        builder.addSource("sourceII", "source-topicII");
        builder.addSink("sink", "dest-topic", "source", "sourceII");
        Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
        Set<String> nodeGroup = nodeGroups.get(0);
        Assert.assertTrue((boolean)nodeGroup.contains("sink"));
        Assert.assertTrue((boolean)nodeGroup.contains("source"));
        Assert.assertTrue((boolean)nodeGroup.contains("sourceII"));
    }

    @Test
    public void testSourceTopics() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("X");
        builder.addSource("source-1", "topic-1");
        builder.addSource("source-2", "topic-2");
        builder.addSource("source-3", "topic-3");
        builder.addInternalTopic("topic-3");
        Pattern expectedPattern = Pattern.compile("X-topic-3|topic-1|topic-2");
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testPatternSourceTopic() {
        TopologyBuilder builder = new TopologyBuilder();
        Pattern expectedPattern = Pattern.compile("topic-\\d");
        builder.addSource("source-1", expectedPattern);
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testAddMoreThanOnePatternSourceNode() {
        TopologyBuilder builder = new TopologyBuilder();
        Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d");
        builder.addSource("source-1", Pattern.compile("topics[A-Z]"));
        builder.addSource("source-2", Pattern.compile(".*-\\d"));
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
    }

    @Test
    public void testSubscribeTopicNameAndPattern() {
        TopologyBuilder builder = new TopologyBuilder();
        Pattern expectedPattern = Pattern.compile("topic-bar|topic-foo|.*-\\d");
        builder.addSource("source-1", "topic-foo", "topic-bar");
        builder.addSource("source-2", Pattern.compile(".*-\\d"));
        Assert.assertEquals((Object)expectedPattern.pattern(), (Object)builder.sourceTopicPattern().pattern());
    }

    @Test(expected=TopologyBuilderException.class)
    public void testPatternMatchesAlreadyProvidedTopicSource() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", "foo");
        builder.addSource("source-2", Pattern.compile("f.*"));
    }

    @Test(expected=TopologyBuilderException.class)
    public void testNamedTopicMatchesAlreadyProvidedPattern() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", Pattern.compile("f.*"));
        builder.addSource("source-2", "foo");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddStateStoreWithNonExistingProcessor() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddStateStoreWithSource() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", "topic-1");
        builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddStateStoreWithSink() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink("sink-1", "topic-1", new String[0]);
        builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
    }

    @Test(expected=TopologyBuilderException.class)
    public void testAddStateStoreWithDuplicates() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addStateStore(new MockStateStoreSupplier("store", false), new String[0]);
        builder.addStateStore(new MockStateStoreSupplier("store", false), new String[0]);
    }

    @Test
    public void testAddStateStore() {
        TopologyBuilder builder = new TopologyBuilder();
        MockStateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
        builder.addStateStore(supplier, new String[0]);
        builder.setApplicationId("X");
        builder.addSource("source-1", "topic-1");
        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
        Assert.assertEquals((long)0L, (long)builder.build(null).stateStores().size());
        builder.connectProcessorAndStateStores("processor-1", "store-1");
        List<StateStore> suppliers = builder.build(null).stateStores();
        Assert.assertEquals((long)1L, (long)suppliers.size());
        Assert.assertEquals((Object)supplier.name(), (Object)suppliers.get(0).name());
    }

    @Test
    public void testTopicGroups() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("X");
        builder.addInternalTopic("topic-1x");
        builder.addSource("source-1", "topic-1", "topic-1x");
        builder.addSource("source-2", "topic-2");
        builder.addSource("source-3", "topic-3");
        builder.addSource("source-4", "topic-4");
        builder.addSource("source-5", "topic-5");
        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
        builder.copartitionSources(Utils.mkList("source-1", "source-2"));
        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
        HashMap<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<Integer, TopologyBuilder.TopicsInfo>();
        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap()));
        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap()));
        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap()));
        Assert.assertEquals((long)3L, (long)topicGroups.size());
        Assert.assertEquals(expectedTopicGroups, topicGroups);
        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
        Assert.assertEquals(Utils.mkSet(Utils.mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<Set<String>>(copartitionGroups));
    }

    @Test
    public void testTopicGroupsByStateStore() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("X");
        builder.addSource("source-1", "topic-1", "topic-1x");
        builder.addSource("source-2", "topic-2");
        builder.addSource("source-3", "topic-3");
        builder.addSource("source-4", "topic-4");
        builder.addSource("source-5", "topic-5");
        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
        builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2");
        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
        builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
        builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4");
        builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
        MockStateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false);
        builder.addStateStore(supplier, new String[0]);
        builder.connectProcessorAndStateStores("processor-5", "store-3");
        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
        HashMap<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<Integer, TopologyBuilder.TopicsInfo>();
        String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
        String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
        String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
        expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-1", "topic-1x", "topic-2"), Collections.emptyMap(), Collections.singletonMap(store1, new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap()))));
        expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.singletonMap(store2, new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap()))));
        expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(Collections.emptySet(), Utils.mkSet("topic-5"), Collections.emptyMap(), Collections.singletonMap(store3, new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap()))));
        Assert.assertEquals((long)3L, (long)topicGroups.size());
        Assert.assertEquals(expectedTopicGroups, topicGroups);
    }

    @Test
    public void testBuild() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", "topic-1", "topic-1x");
        builder.addSource("source-2", "topic-2");
        builder.addSource("source-3", "topic-3");
        builder.addSource("source-4", "topic-4");
        builder.addSource("source-5", "topic-5");
        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
        builder.setApplicationId("X");
        ProcessorTopology topology0 = builder.build(0);
        ProcessorTopology topology1 = builder.build(1);
        ProcessorTopology topology2 = builder.build(2);
        Assert.assertEquals(Utils.mkSet("source-1", "source-2", "processor-1", "processor-2"), this.nodeNames(topology0.processors()));
        Assert.assertEquals(Utils.mkSet("source-3", "source-4", "processor-3"), this.nodeNames(topology1.processors()));
        Assert.assertEquals(Utils.mkSet("source-5"), this.nodeNames(topology2.processors()));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSink() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink(null, "topic", new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicWhenAddingSink() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSink("name", null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingProcessor() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addProcessor(null, new ProcessorSupplier(){

            public Processor get() {
                return null;
            }
        }, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessorSupplier() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addProcessor("name", null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullNameWhenAddingSource() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource(null, Pattern.compile(".*"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.connectProcessorAndStateStores(null, "store");
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAddNullInternalTopic() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addInternalTopic(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotSetApplicationIdToNull() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAddNullStateStoreSupplier() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addStateStore(null, new String[0]);
    }

    private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
        HashSet<String> nodeNames = new HashSet<String>();
        for (ProcessorNode node : nodes) {
            nodeNames.add(node.name());
        }
        return nodeNames;
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", "topic");
        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
        Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((long)1L, (long)stateStoreNameToSourceTopic.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
    }

    @Test
    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source", "topic");
        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
        Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((long)1L, (long)stateStoreNameToSourceTopic.size());
        Assert.assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
    }

    @Test
    public void shouldCorrectlyMapStateStoreToInternalTopics() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("appId");
        builder.addInternalTopic("internal-topic");
        builder.addSource("source", "internal-topic");
        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
        Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((long)1L, (long)stateStoreNameToSourceTopic.size());
        Assert.assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
    }

    @Test
    public void shouldAddInternalTopicConfigForWindowStores() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("appId");
        builder.addSource("source", "topic");
        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000L, 3, false, null, null, 10000L, true, Collections.emptyMap(), false), "processor");
        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
        TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
        InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
        Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals((long)2L, (long)properties.size());
        Assert.assertEquals((Object)"40000", (Object)properties.get("retention.ms"));
        Assert.assertEquals((Object)"appId-store-changelog", (Object)topicConfig.name());
    }

    @Test
    public void shouldAddInternalTopicConfigForNonWindowStores() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("appId");
        builder.addSource("source", "topic");
        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
        builder.addStateStore(new MockStateStoreSupplier("store", true), "processor");
        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
        TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
        InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
        Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals((long)1L, (long)properties.size());
        Assert.assertEquals((Object)"appId-store-changelog", (Object)topicConfig.name());
    }

    @Test
    public void shouldAddInternalTopicConfigForRepartitionTopics() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("appId");
        builder.addInternalTopic("foo");
        builder.addSource("source", "foo");
        TopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
        InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
        Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000L);
        Assert.assertEquals((long)4L, (long)properties.size());
        Assert.assertEquals((Object)"appId-foo", (Object)topicConfig.name());
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
        String sourceNodeName = "source";
        String goodNodeName = "goodGuy";
        String badNodeName = "badGuy";
        Properties config = new Properties();
        config.put("bootstrap.servers", "host:1");
        config.put("application.id", "appId");
        config.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        StreamsConfig streamsConfig = new StreamsConfig(config);
        try {
            TopologyBuilder builder = new TopologyBuilder();
            builder.addSource("source", "topic").addProcessor("goodGuy", new LocalMockProcessorSupplier(), "source").addStateStore(Stores.create("store").withStringKeys().withStringValues().inMemory().build(), "goodGuy").addProcessor("badGuy", new LocalMockProcessorSupplier(), "source");
            ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder);
            driver.process("topic", null, null);
        }
        catch (StreamsException e) {
            Throwable cause = e.getCause();
            if (cause != null && cause instanceof TopologyBuilderException && cause.getMessage().equals("Invalid topology building: Processor badGuy has no access to StateStore store")) {
                throw (TopologyBuilderException)cause;
            }
            throw new RuntimeException("Did expect different exception. Did catch:", e);
        }
    }

    @Test
    public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source-1", "topic-foo");
        builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
        builder.addSource("source-3", Pattern.compile("topic-\\d"));
        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
        Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        updatedTopicsField.setAccessible(true);
        Set updatedTopics = (Set)updatedTopicsField.get(subscriptionUpdates);
        updatedTopics.add("topic-B");
        updatedTopics.add("topic-3");
        updatedTopics.add("topic-A");
        builder.updateSubscriptions(subscriptionUpdates, null);
        builder.setApplicationId("test-id");
        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
        Assert.assertTrue((boolean)topicGroups.get((Object)Integer.valueOf((int)0)).sourceTopics.contains("topic-foo"));
        Assert.assertTrue((boolean)topicGroups.get((Object)Integer.valueOf((int)1)).sourceTopics.contains("topic-A"));
        Assert.assertTrue((boolean)topicGroups.get((Object)Integer.valueOf((int)1)).sourceTopics.contains("topic-B"));
        Assert.assertTrue((boolean)topicGroups.get((Object)Integer.valueOf((int)2)).sourceTopics.contains("topic-3"));
    }

    @Test
    public void shouldAddTimestampExtractorPerSource() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource((TimestampExtractor)new MockTimestampExtractor(), "source", "topic");
        ProcessorTopology processorTopology = builder.build(null);
        Assert.assertThat((Object)processorTopology.source("topic").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithOffsetResetPerSource() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource(null, (TimestampExtractor)new MockTimestampExtractor(), "source", "topic");
        ProcessorTopology processorTopology = builder.build(null);
        Assert.assertThat((Object)processorTopology.source("topic").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithPatternPerSource() {
        TopologyBuilder builder = new TopologyBuilder();
        Pattern pattern = Pattern.compile("t.*");
        builder.addSource((TimestampExtractor)new MockTimestampExtractor(), "source", pattern);
        ProcessorTopology processorTopology = builder.build(null);
        Assert.assertThat((Object)processorTopology.source(pattern.pattern()).getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() {
        TopologyBuilder builder = new TopologyBuilder();
        Pattern pattern = Pattern.compile("t.*");
        builder.addSource(null, (TimestampExtractor)new MockTimestampExtractor(), "source", pattern);
        ProcessorTopology processorTopology = builder.build(null);
        Assert.assertThat((Object)processorTopology.source(pattern.pattern()).getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource(null, "source", (TimestampExtractor)new MockTimestampExtractor(), null, null, "topic");
        ProcessorTopology processorTopology = builder.build(null);
        Assert.assertThat((Object)processorTopology.source("topic").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() {
        TopologyBuilder builder = new TopologyBuilder();
        Pattern pattern = Pattern.compile("t.*");
        builder.addSource(null, "source", (TimestampExtractor)new MockTimestampExtractor(), null, null, pattern);
        ProcessorTopology processorTopology = builder.build(null);
        Assert.assertThat((Object)processorTopology.source(pattern.pattern()).getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder().addSource("ingest", Pattern.compile("topic-\\d+")).addProcessor("my-processor", new MockProcessorSupplier(), "ingest").addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
        Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        updatedTopicsField.setAccessible(true);
        Set updatedTopics = (Set)updatedTopicsField.get(subscriptionUpdates);
        updatedTopics.add("topic-2");
        updatedTopics.add("topic-3");
        updatedTopics.add("topic-A");
        topologyBuilder.updateSubscriptions(subscriptionUpdates, "test-thread");
        topologyBuilder.setApplicationId("test-app");
        Map<String, List<String>> stateStoreAndTopics = topologyBuilder.stateStoreNameToSourceTopics();
        List<String> topics = stateStoreAndTopics.get("testStateStore");
        Assert.assertTrue((String)"Expected to contain two topics", (topics.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)topics.contains("topic-2"));
        Assert.assertTrue((boolean)topics.contains("topic-3"));
        Assert.assertFalse((boolean)topics.contains("topic-A"));
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
        String sameNameForSourceAndProcessor = "sameName";
        TopologyBuilder topologyBuilder = new TopologyBuilder().addGlobalStore(new MockStateStoreSupplier("anyName", false, false), "sameName", null, null, "anyTopicName", "sameName", new MockProcessorSupplier());
    }

    private static class LocalMockProcessorSupplier
    implements ProcessorSupplier {
        static final String STORE_NAME = "store";

        private LocalMockProcessorSupplier() {
        }

        public Processor get() {
            return new Processor(){

                @Override
                public void init(ProcessorContext context) {
                    context.getStateStore(LocalMockProcessorSupplier.STORE_NAME);
                }

                public void process(Object key, Object value) {
                }

                @Override
                public void punctuate(long timestamp) {
                }

                @Override
                public void close() {
                }
            };
        }
    }
}

