/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@Timeout(value=600L)
@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.STRICT_STUBS)
public class TopologyTest {
    @Mock
    private StoreBuilder<MockKeyValueStore> storeBuilder;
    @Mock
    private KeyValueStoreBuilder<?, ?> globalStoreBuilder;
    private final Topology topology = new Topology();
    private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
    private StreamsConfig streamsConfig;

    @BeforeEach
    public void setUp() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("application.id", "applicationId");
        configs.put("bootstrap.servers", "localhost:9092");
        configs.put("default.key.serde", Serdes.StringSerde.class);
        configs.put("default.value.serde", Serdes.StringSerde.class);
        this.streamsConfig = new StreamsConfig(configs);
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingSourceWithTopic() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addSource((String)null, new String[]{"topic"}));
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingSourceWithPattern() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addSource(null, Pattern.compile(".*")));
    }

    @Test
    public void shouldNotAllowNullTopicsWhenAddingSourceWithTopic() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addSource("source", (String[])null));
    }

    @Test
    public void shouldNotAllowNullTopicsWhenAddingSourceWithPattern() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addSource("source", (Pattern)null));
    }

    @Test
    public void shouldNotAllowZeroTopicsWhenAddingSource() {
        Assertions.assertThrows(TopologyException.class, () -> this.topology.addSource("source", new String[0]));
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingProcessor() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addProcessor(null, () -> new MockApiProcessorSupplier().get(), new String[0]));
    }

    @Test
    public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addProcessor("name", (ProcessorSupplier)null, new String[0]));
    }

    @Test
    public void shouldNotAllowNullNameWhenAddingSink() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addSink(null, "topic", new String[0]));
    }

    @Test
    public void shouldNotAllowNullTopicWhenAddingSink() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addSink("name", (String)null, new String[0]));
    }

    @Test
    public void shouldNotAllowNullTopicChooserWhenAddingSink() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addSink("name", (TopicNameExtractor)null, new String[0]));
    }

    @Test
    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.connectProcessorAndStateStores(null, new String[]{"store"}));
    }

    @Test
    public void shouldNotAllowNullStoreNameWhenConnectingProcessorAndStateStores() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.connectProcessorAndStateStores("processor", (String[])null));
    }

    @Test
    public void shouldNotAllowZeroStoreNameWhenConnectingProcessorAndStateStores() {
        Assertions.assertThrows(TopologyException.class, () -> this.topology.connectProcessorAndStateStores("processor", new String[0]));
    }

    @Test
    public void shouldNotAddNullStateStoreSupplier() {
        Assertions.assertThrows(NullPointerException.class, () -> this.topology.addStateStore(null, new String[0]));
    }

    @Test
    public void shouldNotAllowToAddSourcesWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addSource("source", new String[]{"topic-2"});
            Assertions.fail((String)"Should throw TopologyException for duplicate source name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddTopicTwice() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addSource("source-2", new String[]{"topic-1"});
            Assertions.fail((String)"Should throw TopologyException for already used topic");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testPatternMatchesAlreadyProvidedTopicSource() {
        this.topology.addSource("source-1", new String[]{"foo"});
        try {
            this.topology.addSource("source-2", Pattern.compile("f.*"));
            Assertions.fail((String)"Should have thrown TopologyException for overlapping pattern with already registered topic");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void testNamedTopicMatchesAlreadyProvidedPattern() {
        this.topology.addSource("source-1", Pattern.compile("f.*"));
        try {
            this.topology.addSource("source-2", new String[]{"foo"});
            Assertions.fail((String)"Should have thrown TopologyException for overlapping topic with already registered pattern");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddProcessorWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        try {
            this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
            Assertions.fail((String)"Should throw TopologyException for duplicate processor name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddProcessorWithEmptyParents() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[0]);
            Assertions.fail((String)"Should throw TopologyException for processor without at least one parent node");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddProcessorWithNullParents() {
        this.topology.addSource("source", new String[]{"topic-1"});
        try {
            this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{null});
            Assertions.fail((String)"Should throw NullPointerException for processor when null parent names are provided");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldFailOnUnknownSource() {
        Assertions.assertThrows(TopologyException.class, () -> this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"}));
    }

    @Test
    public void shouldFailIfNodeIsItsOwnParent() {
        Assertions.assertThrows(TopologyException.class, () -> this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"processor"}));
    }

    @Test
    public void shouldNotAllowToAddSinkWithSameName() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addSink("sink", "topic-2", new String[]{"source"});
        try {
            this.topology.addSink("sink", "topic-3", new String[]{"source"});
            Assertions.fail((String)"Should throw TopologyException for duplicate sink name");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddSinkWithEmptyParents() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        try {
            this.topology.addSink("sink", "topic-2", new String[0]);
            Assertions.fail((String)"Should throw TopologyException for sink without at least one parent node");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddSinkWithNullParents() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        try {
            this.topology.addSink("sink", "topic-2", new String[]{null});
            Assertions.fail((String)"Should throw NullPointerException for sink when null parent names are provided");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldFailWithUnknownParent() {
        Assertions.assertThrows(TopologyException.class, () -> this.topology.addSink("sink", "topic-2", new String[]{"source"}));
    }

    @Test
    public void shouldFailIfSinkIsItsOwnParent() {
        Assertions.assertThrows(TopologyException.class, () -> this.topology.addSink("sink", "topic-2", new String[]{"sink"}));
    }

    @Test
    public void shouldFailIfSinkIsParent() {
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addSink("sink-1", "topic-2", new String[]{"source"});
        try {
            this.topology.addSink("sink-2", "topic-3", new String[]{"sink-1"});
            Assertions.fail((String)"Should throw TopologyException for using sink as parent");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        this.mockStoreBuilder();
        Assertions.assertThrows(TopologyException.class, () -> this.topology.addStateStore(this.storeBuilder, new String[]{"no-such-processor"}));
    }

    @Test
    public void shouldNotAllowToAddStateStoreToSource() {
        this.mockStoreBuilder();
        this.topology.addSource("source-1", new String[]{"topic-1"});
        try {
            this.topology.addStateStore(this.storeBuilder, new String[]{"source-1"});
            Assertions.fail((String)"Should have thrown TopologyException for adding store to source node");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAllowToAddStateStoreToSink() {
        this.mockStoreBuilder();
        this.topology.addSource("source-1", new String[]{"topic-1"});
        this.topology.addSink("sink-1", "topic-1", new String[]{"source-1"});
        try {
            this.topology.addStateStore(this.storeBuilder, new String[]{"sink-1"});
            Assertions.fail((String)"Should have thrown TopologyException for adding store to sink node");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    private void mockStoreBuilder() {
        Mockito.when((Object)this.storeBuilder.name()).thenReturn((Object)"store");
    }

    @Test
    public void shouldNotAllowToAddStoreWithSameNameAndDifferentInstance() {
        this.mockStoreBuilder();
        this.topology.addStateStore(this.storeBuilder, new String[0]);
        StoreBuilder otherStoreBuilder = (StoreBuilder)Mockito.mock(StoreBuilder.class);
        Mockito.when((Object)otherStoreBuilder.name()).thenReturn((Object)"store");
        try {
            this.topology.addStateStore(otherStoreBuilder, new String[0]);
            Assertions.fail((String)"Should have thrown TopologyException for same store name with different StoreBuilder");
        }
        catch (TopologyException topologyException) {
            // empty catch block
        }
    }

    @Test
    public void shouldAllowToShareStoreUsingSameStoreBuilder() {
        this.mockStoreBuilder();
        this.topology.addSource("source", new String[]{"topic-1"});
        this.topology.addProcessor("processor-1", new MockProcessorSupplierProvidingStore(this.storeBuilder), new String[]{"source"});
        this.topology.addProcessor("processor-2", new MockProcessorSupplierProvidingStore(this.storeBuilder), new String[]{"source"});
    }

    @Test
    public void shouldThrowOnUnassignedStateStoreAccess() {
        String sourceNodeName = "source";
        String goodNodeName = "goodGuy";
        String badNodeName = "badGuy";
        this.mockStoreBuilder();
        Mockito.when((Object)this.storeBuilder.build()).thenReturn((Object)new MockKeyValueStore("store", false));
        this.topology.addSource("source", new String[]{"topic"}).addProcessor("goodGuy", (ProcessorSupplier)new LocalMockProcessorSupplier(), new String[]{"source"}).addStateStore(this.storeBuilder, new String[]{"goodGuy"}).addProcessor("badGuy", (ProcessorSupplier)new LocalMockProcessorSupplier(), new String[]{"source"});
        Properties config = new Properties();
        config.put("default.key.serde", Serdes.ByteArraySerde.class);
        config.put("default.value.serde", Serdes.ByteArraySerde.class);
        try {
            new TopologyTestDriver(this.topology, config);
            Assertions.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException e) {
            String error = e.toString();
            String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor badGuy";
            MatcherAssert.assertThat((Object)error, (Matcher)CoreMatchers.equalTo((Object)"org.apache.kafka.streams.errors.StreamsException: failed to initialize processor badGuy"));
        }
    }

    @Deprecated
    @Test
    public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
        Mockito.when((Object)this.globalStoreBuilder.name()).thenReturn((Object)"anyName");
        Assertions.assertThrows(TopologyException.class, () -> this.topology.addGlobalStore(this.globalStoreBuilder, "sameName", null, null, "anyTopicName", "sameName", new MockProcessorSupplier()));
    }

    @Test
    public void shouldDescribeEmptyTopology() {
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
    }

    @Test
    public void sinkShouldReturnNullTopicWithDynamicRouting() {
        InternalTopologyBuilder.Sink expectedSinkNode = new InternalTopologyBuilder.Sink("sink", (key, value, record) -> record.topic() + "-" + key);
        MatcherAssert.assertThat((Object)expectedSinkNode.topic(), (Matcher)CoreMatchers.equalTo(null));
    }

    @Test
    public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
        TopicNameExtractor topicNameExtractor = (key, value, record) -> record.topic() + "-" + key;
        InternalTopologyBuilder.Sink expectedSinkNode = new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
        MatcherAssert.assertThat((Object)expectedSinkNode.topicNameExtractor(), (Matcher)CoreMatchers.equalTo((Object)topicNameExtractor));
    }

    @Test
    public void singleSourceShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, Collections.singleton(expectedSourceNode)));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic1", "topic2", "topic3");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, Collections.singleton(expectedSourceNode)));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void singleSourcePatternShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", Pattern.compile("topic[0-9]"));
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, Collections.singleton(expectedSourceNode)));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void multipleSourcesShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source1", "topic1");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, Collections.singleton(expectedSourceNode1)));
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(1, Collections.singleton(expectedSourceNode2)));
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(2, Collections.singleton(expectedSourceNode3)));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void sourceAndProcessorShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        TopologyDescription.Processor expectedProcessorNode = this.addProcessor("processor", new TopologyDescription.Node[]{expectedSourceNode});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode);
        allNodes.add(expectedProcessorNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        String[] store = new String[]{"store"};
        TopologyDescription.Processor expectedProcessorNode = this.addProcessorWithNewStore("processor", store, new TopologyDescription.Node[]{expectedSourceNode});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode);
        allNodes.add(expectedProcessorNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        String[] stores = new String[]{"store1", "store2"};
        TopologyDescription.Processor expectedProcessorNode = this.addProcessorWithNewStore("processor", stores, new TopologyDescription.Node[]{expectedSourceNode});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode);
        allNodes.add(expectedProcessorNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode = this.addSource("source", "topic");
        TopologyDescription.Processor expectedProcessorNode1 = this.addProcessor("processor1", new TopologyDescription.Node[]{expectedSourceNode});
        TopologyDescription.Processor expectedProcessorNode2 = this.addProcessor("processor2", new TopologyDescription.Node[]{expectedSourceNode});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode);
        allNodes.add(expectedProcessorNode1);
        allNodes.add(expectedProcessorNode2);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source1", "topic0");
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", Pattern.compile("topic[1-9]"));
        TopologyDescription.Processor expectedProcessorNode = this.addProcessor("processor", new TopologyDescription.Node[]{expectedSourceNode1, expectedSourceNode2});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode1);
        allNodes.add(expectedSourceNode2);
        allNodes.add(expectedProcessorNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source1", "topic1");
        TopologyDescription.Processor expectedProcessorNode1 = this.addProcessor("processor1", new TopologyDescription.Node[]{expectedSourceNode1});
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        TopologyDescription.Processor expectedProcessorNode2 = this.addProcessor("processor2", new TopologyDescription.Node[]{expectedSourceNode2});
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        TopologyDescription.Processor expectedProcessorNode3 = this.addProcessor("processor3", new TopologyDescription.Node[]{expectedSourceNode3});
        HashSet<Object> allNodes1 = new HashSet<Object>();
        allNodes1.add(expectedSourceNode1);
        allNodes1.add(expectedProcessorNode1);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes1));
        HashSet<Object> allNodes2 = new HashSet<Object>();
        allNodes2.add(expectedSourceNode2);
        allNodes2.add(expectedProcessorNode2);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(1, allNodes2));
        HashSet<Object> allNodes3 = new HashSet<Object>();
        allNodes3.add(expectedSourceNode3);
        allNodes3.add(expectedProcessorNode3);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(2, allNodes3));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source1", "topic1");
        TopologyDescription.Sink expectedSinkNode1 = this.addSink("sink1", "sinkTopic1", new TopologyDescription.Node[]{expectedSourceNode1});
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        TopologyDescription.Sink expectedSinkNode2 = this.addSink("sink2", "sinkTopic2", new TopologyDescription.Node[]{expectedSourceNode2});
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        TopologyDescription.Sink expectedSinkNode3 = this.addSink("sink3", "sinkTopic3", new TopologyDescription.Node[]{expectedSourceNode3});
        HashSet<Object> allNodes1 = new HashSet<Object>();
        allNodes1.add(expectedSourceNode1);
        allNodes1.add(expectedSinkNode1);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes1));
        HashSet<Object> allNodes2 = new HashSet<Object>();
        allNodes2.add(expectedSourceNode2);
        allNodes2.add(expectedSinkNode2);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(1, allNodes2));
        HashSet<Object> allNodes3 = new HashSet<Object>();
        allNodes3.add(expectedSourceNode3);
        allNodes3.add(expectedSinkNode3);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(2, allNodes3));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void processorsWithSameSinkShouldHaveSameSubtopology() {
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source", "topic");
        TopologyDescription.Processor expectedProcessorNode1 = this.addProcessor("processor1", new TopologyDescription.Node[]{expectedSourceNode1});
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        TopologyDescription.Processor expectedProcessorNode2 = this.addProcessor("processor2", new TopologyDescription.Node[]{expectedSourceNode2});
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        TopologyDescription.Processor expectedProcessorNode3 = this.addProcessor("processor3", new TopologyDescription.Node[]{expectedSourceNode3});
        TopologyDescription.Sink expectedSinkNode = this.addSink("sink", "sinkTopic", new TopologyDescription.Node[]{expectedProcessorNode1, expectedProcessorNode2, expectedProcessorNode3});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode1);
        allNodes.add(expectedProcessorNode1);
        allNodes.add(expectedSourceNode2);
        allNodes.add(expectedProcessorNode2);
        allNodes.add(expectedSourceNode3);
        allNodes.add(expectedProcessorNode3);
        allNodes.add(expectedSinkNode);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void processorsWithSharedStateShouldHaveSameSubtopology() {
        String[] store1 = new String[]{"store1"};
        String[] store2 = new String[]{"store2"};
        String[] bothStores = new String[]{store1[0], store2[0]};
        TopologyDescription.Source expectedSourceNode1 = this.addSource("source", "topic");
        TopologyDescription.Processor expectedProcessorNode1 = this.addProcessorWithNewStore("processor1", store1, new TopologyDescription.Node[]{expectedSourceNode1});
        TopologyDescription.Source expectedSourceNode2 = this.addSource("source2", "topic2");
        TopologyDescription.Processor expectedProcessorNode2 = this.addProcessorWithNewStore("processor2", store2, new TopologyDescription.Node[]{expectedSourceNode2});
        TopologyDescription.Source expectedSourceNode3 = this.addSource("source3", "topic3");
        TopologyDescription.Processor expectedProcessorNode3 = this.addProcessorWithExistingStore("processor3", bothStores, new TopologyDescription.Node[]{expectedSourceNode3});
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSourceNode1);
        allNodes.add(expectedProcessorNode1);
        allNodes.add(expectedSourceNode2);
        allNodes.add(expectedProcessorNode2);
        allNodes.add(expectedSourceNode3);
        allNodes.add(expectedProcessorNode3);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void shouldDescribeGlobalStoreTopology() {
        this.addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor", 0);
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void shouldDescribeMultipleGlobalStoreTopology() {
        this.addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1", 0);
        this.addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2", 1);
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void streamStreamJoinTopologyWithDefaultStoresNames() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("input-topic1");
        KStream stream2 = builder.stream("input-topic2");
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n", (Object)describe.toString());
    }

    @Test
    public void streamStreamJoinTopologyWithCustomStoresNames() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("input-topic1");
        KStream stream2 = builder.stream("input-topic2");
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()).withStoreName("custom-name"));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [custom-name-this-join-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [custom-name-other-join-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [custom-name-this-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [custom-name-other-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n", (Object)describe.toString());
    }

    @Test
    public void streamStreamJoinTopologyWithCustomStoresSuppliers() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("input-topic1");
        KStream stream2 = builder.stream("input-topic2");
        JoinWindows joinWindows = JoinWindows.of((Duration)Duration.ofMillis(100L));
        WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store-other", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, joinWindows, StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()).withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [in-memory-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n", (Object)describe.toString());
    }

    @Test
    public void streamStreamLeftJoinTopologyWithDefaultStoresNames() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("input-topic1");
        KStream stream2 = builder.stream("input-topic2");
        stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n      --> KSTREAM-OUTEROTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store, KSTREAM-OUTERSHARED-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store, KSTREAM-OUTERSHARED-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n", (Object)describe.toString());
    }

    @Test
    public void streamStreamLeftJoinTopologyWithCustomStoresNames() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("input-topic1");
        KStream stream2 = builder.stream("input-topic2");
        stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()).withStoreName("custom-name"));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [custom-name-this-join-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [custom-name-outer-other-join-store])\n      --> KSTREAM-OUTEROTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [custom-name-outer-other-join-store, custom-name-left-shared-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-this-join-store, custom-name-left-shared-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n", (Object)describe.toString());
    }

    @Test
    public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("input-topic1");
        KStream stream2 = builder.stream("input-topic2");
        JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L));
        WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store-other", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, joinWindows, StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()).withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n      --> KSTREAM-OUTEROTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-left-shared-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store, in-memory-join-store-left-shared-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n", (Object)describe.toString());
    }

    @Test
    public void streamStreamOuterJoinTopologyWithDefaultStoresNames() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("input-topic1");
        KStream stream2 = builder.stream("input-topic2");
        stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-OUTERTHIS-0000000004-store])\n      --> KSTREAM-OUTERTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-OUTEROTHER-0000000005-store])\n      --> KSTREAM-OUTEROTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [KSTREAM-OUTERTHIS-0000000004-store, KSTREAM-OUTERSHARED-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [KSTREAM-OUTEROTHER-0000000005-store, KSTREAM-OUTERSHARED-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-OUTERTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n", (Object)describe.toString());
    }

    @Test
    public void streamStreamOuterJoinTopologyWithCustomStoresNames() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("input-topic1");
        KStream stream2 = builder.stream("input-topic2");
        stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()).withStoreName("custom-name"));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [custom-name-outer-this-join-store])\n      --> KSTREAM-OUTERTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [custom-name-outer-other-join-store])\n      --> KSTREAM-OUTEROTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [custom-name-outer-this-join-store, custom-name-outer-shared-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [custom-name-outer-other-join-store, custom-name-outer-shared-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-OUTERTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n", (Object)describe.toString());
    }

    @Test
    public void streamStreamOuterJoinTopologyWithCustomStoresSuppliers() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream("input-topic1");
        KStream stream2 = builder.stream("input-topic2");
        JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L));
        WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store-other", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, joinWindows, StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()).withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n      --> KSTREAM-OUTERTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n      --> KSTREAM-OUTEROTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-OUTEROTHER-0000000005 (stores: [in-memory-join-store-outer-shared-join-store, in-memory-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-OUTERTHIS-0000000004 (stores: [in-memory-join-store-other, in-memory-join-store-outer-shared-join-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-OUTERTHIS-0000000004, KSTREAM-OUTEROTHER-0000000005\n\n", (Object)describe.toString());
    }

    @Test
    public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
        StreamsBuilder builder = new StreamsBuilder();
        TopicNameExtractor<Object, Object> topicNameExtractor = new TopicNameExtractor<Object, Object>(){

            public String extract(Object key, Object value, RecordContext recordContext) {
                return recordContext.topic() + "-" + key;
            }

            public String toString() {
                return "anonymous topic name extractor. topic is [recordContext.topic()]-[key]";
            }
        };
        builder.stream("input-topic").to((TopicNameExtractor)topicNameExtractor);
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-SINK-0000000001\n    Sink: KSTREAM-SINK-0000000001 (extractor class: anonymous topic name extractor. topic is [recordContext.topic()]-[key])\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
    }

    @Test
    public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().count();
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void kGroupedStreamNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().count(Materialized.as((String)"count-store").withStoreType((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000001\n    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void kGroupedStreamAnonymousMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().count(Materialized.with(null, (Serde)Serdes.Long()).withStoreType((DslStoreSuppliers)Materialized.StoreType.ROCKS_DB));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void kGroupedStreamAnonymousStoreTypedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().count(Materialized.as((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void kGroupedStreamZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder(this.overrideDefaultStore("in_memory"));
        builder.stream("input-topic").groupByKey().count();
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topology: my-topology:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void timeWindowZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(1L))).count();
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void timeWindowNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(1L))).count(Materialized.as((String)"count-store").withStoreType((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000001\n    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void timeWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(1L))).count(Materialized.with(null, (Serde)Serdes.Long()).withStoreType((DslStoreSuppliers)Materialized.StoreType.ROCKS_DB));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void timeWindowAnonymousStoreTypeMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(1L))).count(Materialized.as((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void timeWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder(this.overrideDefaultStore("in_memory"));
        builder.stream("input-topic").groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(1L))).count();
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topology: my-topology:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void slidingWindowZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(1L))).count();
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void slidingWindowNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(1L))).count(Materialized.as((String)"count-store").withStoreType((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000001\n    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void slidingWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder(this.overrideDefaultStore("in_memory"));
        builder.stream("input-topic").groupByKey().windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(1L))).count();
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topology: my-topology:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void timeWindowedCogroupedZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().cogroup((key, value, aggregate) -> value).windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(1L))).aggregate(() -> "");
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> COGROUPKSTREAM-AGGREGATE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000002 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> COGROUPKSTREAM-MERGE-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-MERGE-0000000003 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000002\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void timeWindowedCogroupedNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().cogroup((key, value, aggregate) -> value).windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(1L))).aggregate(() -> "", Materialized.as((String)"aggregate-store").withStoreType((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> COGROUPKSTREAM-AGGREGATE-0000000001\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000001 (stores: [aggregate-store])\n      --> COGROUPKSTREAM-MERGE-0000000002\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-MERGE-0000000002 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000001\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void timeWindowedCogroupedZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder(this.overrideDefaultStore("in_memory"));
        builder.stream("input-topic").groupByKey().cogroup((key, value, aggregate) -> value).windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(1L))).aggregate(() -> "");
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topology: my-topology:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> COGROUPKSTREAM-AGGREGATE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000002 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> COGROUPKSTREAM-MERGE-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-MERGE-0000000003 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000002\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void slidingWindowedCogroupedZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().cogroup((key, value, aggregate) -> value).windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(1L))).aggregate(() -> "");
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> COGROUPKSTREAM-AGGREGATE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000002 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> COGROUPKSTREAM-MERGE-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-MERGE-0000000003 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000002\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void slidingWindowedCogroupedNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().cogroup((key, value, aggregate) -> value).windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(1L))).aggregate(() -> "", Materialized.as((String)"aggregate-store").withStoreType((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> COGROUPKSTREAM-AGGREGATE-0000000001\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000001 (stores: [aggregate-store])\n      --> COGROUPKSTREAM-MERGE-0000000002\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-MERGE-0000000002 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000001\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void slidingWindowedCogroupedZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder(this.overrideDefaultStore("in_memory"));
        builder.stream("input-topic").groupByKey().cogroup((key, value, aggregate) -> value).windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(1L))).aggregate(() -> "");
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topology: my-topology:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> COGROUPKSTREAM-AGGREGATE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000002 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> COGROUPKSTREAM-MERGE-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-MERGE-0000000003 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000002\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void sessionWindowedCogroupedZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().cogroup((key, value, aggregate) -> value).windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(1L))).aggregate(() -> "", (aggKey, aggOne, aggTwo) -> "");
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> COGROUPKSTREAM-AGGREGATE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000002 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> COGROUPKSTREAM-MERGE-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-MERGE-0000000003 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000002\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void sessionWindowedCogroupedNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().cogroup((key, value, aggregate) -> value).windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(1L))).aggregate(() -> "", (aggKey, aggOne, aggTwo) -> "", Materialized.as((String)"aggregate-store").withStoreType((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> COGROUPKSTREAM-AGGREGATE-0000000001\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000001 (stores: [aggregate-store])\n      --> COGROUPKSTREAM-MERGE-0000000002\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-MERGE-0000000002 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000001\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void sessionWindowedCogroupedZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder(this.overrideDefaultStore("in_memory"));
        builder.stream("input-topic").groupByKey().cogroup((key, value, aggregate) -> value).windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(1L))).aggregate(() -> "", (aggKey, aggOne, aggTwo) -> "");
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topology: my-topology:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> COGROUPKSTREAM-AGGREGATE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000002 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> COGROUPKSTREAM-MERGE-0000000003\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-MERGE-0000000003 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000002\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void sessionWindowZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy(SessionWindows.with((Duration)Duration.ofMillis(1L))).count();
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void sessionWindowNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy(SessionWindows.with((Duration)Duration.ofMillis(1L))).count(Materialized.as((String)"count-store").withStoreType((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000001\n    Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void sessionWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy(SessionWindows.with((Duration)Duration.ofMillis(1L))).count(Materialized.with(null, (Serde)Serdes.Long()).withStoreType((DslStoreSuppliers)Materialized.StoreType.ROCKS_DB));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void sessionWindowAnonymousStoreTypedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic").groupByKey().windowedBy(SessionWindows.with((Duration)Duration.ofMillis(1L))).count(Materialized.as((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void sessionWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder(this.overrideDefaultStore("in_memory"));
        builder.stream("input-topic").groupByKey().windowedBy(SessionWindows.with((Duration)Duration.ofMillis(1L))).count();
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topology: my-topology:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n      --> KSTREAM-AGGREGATE-0000000002\n    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> none\n      <-- KSTREAM-SOURCE-0000000000\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        MatcherAssert.assertThat((Object)topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void tableZeroArgCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.table("input-topic").groupBy((key, value) -> null).count();
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n      --> KTABLE-SELECT-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-SELECT-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000005\n      <-- KTABLE-SOURCE-0000000002\n    Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- KTABLE-SELECT-0000000003\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> KTABLE-AGGREGATE-0000000007\n    Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n      --> none\n      <-- KSTREAM-SOURCE-0000000006\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.stateStores().size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(0)).persistent(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(1)).persistent(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void tableNamedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.table("input-topic").groupBy((key, value) -> null).count(Materialized.as((String)"count-store").withStoreType((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n      --> KTABLE-SELECT-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-SELECT-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000004\n      <-- KTABLE-SOURCE-0000000002\n    Sink: KSTREAM-SINK-0000000004 (topic: count-store-repartition)\n      <-- KTABLE-SELECT-0000000003\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000005 (topics: [count-store-repartition])\n      --> KTABLE-AGGREGATE-0000000006\n    Processor: KTABLE-AGGREGATE-0000000006 (stores: [count-store])\n      --> none\n      <-- KSTREAM-SOURCE-0000000005\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.stateStores().size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(0)).persistent(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(1)).persistent(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void tableNamedMaterializedCountWithTopologyConfigShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder(this.overrideDefaultStore("in_memory"));
        builder.table("input-topic").groupBy((key, value) -> null).count(Materialized.as((DslStoreSuppliers)Materialized.StoreType.ROCKS_DB));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topology: my-topology:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n      --> KTABLE-SELECT-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-SELECT-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000005\n      <-- KTABLE-SOURCE-0000000002\n    Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- KTABLE-SELECT-0000000003\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> KTABLE-AGGREGATE-0000000007\n    Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n      --> none\n      <-- KSTREAM-SOURCE-0000000006\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.stateStores().size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(0)).persistent(), (Matcher)CoreMatchers.is((Object)false));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(1)).persistent(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void tableAnonymousMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.table("input-topic").groupBy((key, value) -> null).count(Materialized.with(null, (Serde)Serdes.Long()).withStoreType((DslStoreSuppliers)Materialized.StoreType.ROCKS_DB));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n      --> KTABLE-SELECT-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-SELECT-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000005\n      <-- KTABLE-SOURCE-0000000002\n    Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- KTABLE-SELECT-0000000003\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> KTABLE-AGGREGATE-0000000007\n    Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n      --> none\n      <-- KSTREAM-SOURCE-0000000006\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.stateStores().size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(0)).persistent(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(1)).persistent(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void tableAnonymousStoreTypedMaterializedCountShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.table("input-topic").groupBy((key, value) -> null).count(Materialized.as((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        Topology topology = builder.build();
        TopologyDescription describe = topology.describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n      --> KTABLE-SELECT-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-SELECT-0000000003 (stores: [])\n      --> KSTREAM-SINK-0000000005\n      <-- KTABLE-SOURCE-0000000002\n    Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- KTABLE-SELECT-0000000003\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> KTABLE-AGGREGATE-0000000007\n    Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n      --> none\n      <-- KSTREAM-SOURCE-0000000006\n\n", (Object)describe.toString());
        topology.internalTopologyBuilder.setStreamsConfig(this.streamsConfig);
        ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.stateStores().size(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(0)).persistent(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)((StateStore)processorTopology.stateStores().get(1)).persistent(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void kTableNonMaterializedMapValuesShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable table = builder.table("input-topic");
        table.mapValues((readOnlyKey, value) -> null);
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-MAPVALUES-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-MAPVALUES-0000000003 (stores: [])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", (Object)describe.toString());
    }

    @Test
    public void kTableAnonymousMaterializedMapValuesShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable table = builder.table("input-topic");
        table.mapValues((readOnlyKey, value) -> null, Materialized.with(null, null).withStoreType((DslStoreSuppliers)Materialized.StoreType.IN_MEMORY));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-MAPVALUES-0000000004\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-MAPVALUES-0000000004 (stores: [])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", (Object)describe.toString());
    }

    @Test
    public void kTableNamedMaterializedMapValuesShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable table = builder.table("input-topic");
        table.mapValues((readOnlyKey, value) -> null, Materialized.as((String)"store-name").withKeySerde(null).withValueSerde(null));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-MAPVALUES-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-MAPVALUES-0000000003 (stores: [store-name])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", (Object)describe.toString());
    }

    @Test
    public void kTableNonMaterializedFilterShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable table = builder.table("input-topic");
        table.filter((key, value) -> false);
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-FILTER-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-FILTER-0000000003 (stores: [])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", (Object)describe.toString());
    }

    @Test
    public void kTableAnonymousMaterializedFilterShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable table = builder.table("input-topic");
        table.filter((key, value) -> false, Materialized.with(null, null));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-FILTER-0000000004\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-FILTER-0000000004 (stores: [])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", (Object)describe.toString());
    }

    @Test
    public void kTableNamedMaterializedFilterShouldPreserveTopologyStructure() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable table = builder.table("input-topic");
        table.filter((key, value) -> false, Materialized.as((String)"store-name"));
        TopologyDescription describe = builder.build().describe();
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n      --> KTABLE-SOURCE-0000000002\n    Processor: KTABLE-SOURCE-0000000002 (stores: [])\n      --> KTABLE-FILTER-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KTABLE-FILTER-0000000003 (stores: [store-name])\n      --> none\n      <-- KTABLE-SOURCE-0000000002\n\n", (Object)describe.toString());
    }

    @Test
    public void topologyWithStaticTopicNameExtractorShouldRespectEqualHashcodeContract() {
        Topology topologyA = this.topologyWithStaticTopicName();
        Topology topologyB = this.topologyWithStaticTopicName();
        MatcherAssert.assertThat((Object)topologyA.describe(), (Matcher)CoreMatchers.equalTo((Object)topologyB.describe()));
        MatcherAssert.assertThat((Object)topologyA.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)topologyB.describe().hashCode()));
    }

    private Topology topologyWithStaticTopicName() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("from-topic-name").to("to-topic-name");
        return builder.build();
    }

    private TopologyDescription.Source addSource(String sourceName, String ... sourceTopic) {
        this.topology.addSource(null, sourceName, null, null, null, sourceTopic);
        StringBuilder allSourceTopics = new StringBuilder(sourceTopic[0]);
        for (int i = 1; i < sourceTopic.length; ++i) {
            allSourceTopics.append(", ").append(sourceTopic[i]);
        }
        return new InternalTopologyBuilder.Source(sourceName, new HashSet<String>(Arrays.asList(sourceTopic)), null);
    }

    private TopologyDescription.Source addSource(String sourceName, Pattern sourcePattern) {
        this.topology.addSource(null, sourceName, null, null, null, sourcePattern);
        return new InternalTopologyBuilder.Source(sourceName, null, sourcePattern);
    }

    private TopologyDescription.Processor addProcessor(String processorName, TopologyDescription.Node ... parents) {
        return this.addProcessorWithNewStore(processorName, new String[0], parents);
    }

    private TopologyDescription.Processor addProcessorWithNewStore(String processorName, String[] storeNames, TopologyDescription.Node ... parents) {
        return this.addProcessorWithStore(processorName, storeNames, true, parents);
    }

    private TopologyDescription.Processor addProcessorWithExistingStore(String processorName, String[] storeNames, TopologyDescription.Node ... parents) {
        return this.addProcessorWithStore(processorName, storeNames, false, parents);
    }

    private TopologyDescription.Processor addProcessorWithStore(String processorName, String[] storeNames, boolean newStores, TopologyDescription.Node ... parents) {
        String[] parentNames = new String[parents.length];
        for (int i = 0; i < parents.length; ++i) {
            parentNames[i] = parents[i].name();
        }
        this.topology.addProcessor(processorName, new MockApiProcessorSupplier(), parentNames);
        if (newStores) {
            for (String store : storeNames) {
                StoreBuilder storeBuilder = (StoreBuilder)Mockito.mock(StoreBuilder.class);
                Mockito.when((Object)storeBuilder.name()).thenReturn((Object)store);
                this.topology.addStateStore(storeBuilder, new String[]{processorName});
            }
        } else {
            this.topology.connectProcessorAndStateStores(processorName, storeNames);
        }
        InternalTopologyBuilder.Processor expectedProcessorNode = new InternalTopologyBuilder.Processor(processorName, new HashSet<String>(Arrays.asList(storeNames)));
        for (TopologyDescription.Node parent : parents) {
            ((InternalTopologyBuilder.AbstractNode)parent).addSuccessor((TopologyDescription.Node)expectedProcessorNode);
            ((InternalTopologyBuilder.AbstractNode)expectedProcessorNode).addPredecessor(parent);
        }
        return expectedProcessorNode;
    }

    private TopologyDescription.Sink addSink(String sinkName, String sinkTopic, TopologyDescription.Node ... parents) {
        String[] parentNames = new String[parents.length];
        for (int i = 0; i < parents.length; ++i) {
            parentNames[i] = parents[i].name();
        }
        this.topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
        InternalTopologyBuilder.Sink expectedSinkNode = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
        for (TopologyDescription.Node parent : parents) {
            ((InternalTopologyBuilder.AbstractNode)parent).addSuccessor((TopologyDescription.Node)expectedSinkNode);
            ((InternalTopologyBuilder.AbstractNode)expectedSinkNode).addPredecessor(parent);
        }
        return expectedSinkNode;
    }

    @Deprecated
    private void addGlobalStoreToTopologyAndExpectedDescription(String globalStoreName, String sourceName, String globalTopicName, String processorName, int id) {
        KeyValueStoreBuilder globalStoreBuilder = (KeyValueStoreBuilder)Mockito.mock(KeyValueStoreBuilder.class);
        Mockito.when((Object)globalStoreBuilder.name()).thenReturn((Object)globalStoreName);
        this.topology.addGlobalStore((StoreBuilder)globalStoreBuilder, sourceName, null, null, null, globalTopicName, processorName, new MockProcessorSupplier());
        InternalTopologyBuilder.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(sourceName, processorName, globalStoreName, globalTopicName, id);
        this.expectedDescription.addGlobalStore((TopologyDescription.GlobalStore)expectedGlobalStore);
    }

    @Test
    public void readOnlyStateStoresShouldHaveTheirOwnSubTopology() {
        String sourceName = "source";
        String storeName = "store";
        String topicName = "topic";
        String processorName = "processor";
        KeyValueStoreBuilder storeBuilder = (KeyValueStoreBuilder)Mockito.mock(KeyValueStoreBuilder.class);
        Mockito.when((Object)storeBuilder.name()).thenReturn((Object)"store");
        this.topology.addReadOnlyStateStore((StoreBuilder)storeBuilder, "source", null, null, null, "topic", "processor", new MockProcessorSupplier());
        InternalTopologyBuilder.Source expectedSource = new InternalTopologyBuilder.Source("source", Sets.newSet((Object[])new String[]{"topic"}), null);
        InternalTopologyBuilder.Processor expectedProcessor = new InternalTopologyBuilder.Processor("processor", Sets.newSet((Object[])new String[]{"store"}));
        ((InternalTopologyBuilder.AbstractNode)expectedSource).addSuccessor((TopologyDescription.Node)expectedProcessor);
        ((InternalTopologyBuilder.AbstractNode)expectedProcessor).addPredecessor((TopologyDescription.Node)expectedSource);
        HashSet<Object> allNodes = new HashSet<Object>();
        allNodes.add(expectedSource);
        allNodes.add(expectedProcessor);
        this.expectedDescription.addSubtopology((TopologyDescription.Subtopology)new InternalTopologyBuilder.SubtopologyDescription(0, allNodes));
        MatcherAssert.assertThat((Object)this.topology.describe(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription));
        MatcherAssert.assertThat((Object)this.topology.describe().hashCode(), (Matcher)CoreMatchers.equalTo((Object)this.expectedDescription.hashCode()));
    }

    @Test
    public void readOnlyStateStoresShouldNotLog() {
        String sourceName = "source";
        String storeName = "store";
        String topicName = "topic";
        String processorName = "processor";
        KeyValueStoreBuilder storeBuilder = (KeyValueStoreBuilder)Mockito.mock(KeyValueStoreBuilder.class);
        Mockito.when((Object)storeBuilder.name()).thenReturn((Object)"store");
        this.topology.addReadOnlyStateStore((StoreBuilder)storeBuilder, "source", null, null, null, "topic", "processor", new MockProcessorSupplier());
        StoreFactory stateStoreFactory = (StoreFactory)this.topology.internalTopologyBuilder.stateStores().get("store");
        MatcherAssert.assertThat((Object)stateStoreFactory.loggingEnabled(), (Matcher)CoreMatchers.equalTo((Object)false));
    }

    private TopologyConfig overrideDefaultStore(String defaultStore) {
        Properties topologyOverrides = new Properties();
        topologyOverrides.put("default.dsl.store", defaultStore);
        StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig());
        return new TopologyConfig("my-topology", config, topologyOverrides);
    }

    private static class LocalMockProcessorSupplier
    implements ProcessorSupplier<Object, Object, Object, Object> {
        static final String STORE_NAME = "store";

        private LocalMockProcessorSupplier() {
        }

        public Processor<Object, Object, Object, Object> get() {
            return new Processor<Object, Object, Object, Object>(){

                public void init(ProcessorContext<Object, Object> context) {
                    context.getStateStore(LocalMockProcessorSupplier.STORE_NAME);
                }

                public void process(Record<Object, Object> record) {
                }
            };
        }
    }

    private static class MockProcessorSupplierProvidingStore<K, V>
    extends MockApiProcessorSupplier<K, V, Void, Void> {
        private final StoreBuilder<MockKeyValueStore> storeBuilder;

        public MockProcessorSupplierProvidingStore(StoreBuilder<MockKeyValueStore> storeBuilder) {
            this.storeBuilder = storeBuilder;
        }

        public Set<StoreBuilder<?>> stores() {
            return Collections.singleton(this.storeBuilder);
        }
    }
}

