/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KTableFilter;
import org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoin;
import org.apache.kafka.streams.kstream.internals.KTableRepartitionMap;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionSendProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.graph.ForeignJoinSubscriptionSendNode;
import org.apache.kafka.streams.kstream.internals.graph.ForeignTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode;
import org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode;
import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class InternalStreamsBuilderTest {
    private static final String APP_ID = "app-id";
    private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
    private final ConsumedInternal<String, String> consumed = new ConsumedInternal();
    private final String storePrefix = "prefix-";
    private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal(Materialized.as((String)"test-store"), (InternalNameProvider)this.builder, "prefix-");
    private final Properties props = StreamsTestUtils.getStreamsConfig();

    @Test
    public void testNewName() {
        Assertions.assertEquals((Object)"X-0000000000", (Object)this.builder.newProcessorName("X-"));
        Assertions.assertEquals((Object)"Y-0000000001", (Object)this.builder.newProcessorName("Y-"));
        Assertions.assertEquals((Object)"Z-0000000002", (Object)this.builder.newProcessorName("Z-"));
        InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
        Assertions.assertEquals((Object)"X-0000000000", (Object)newBuilder.newProcessorName("X-"));
        Assertions.assertEquals((Object)"Y-0000000001", (Object)newBuilder.newProcessorName("Y-"));
        Assertions.assertEquals((Object)"Z-0000000002", (Object)newBuilder.newProcessorName("Z-"));
    }

    @Test
    public void testNewStoreName() {
        Assertions.assertEquals((Object)"X-STATE-STORE-0000000000", (Object)this.builder.newStoreName("X-"));
        Assertions.assertEquals((Object)"Y-STATE-STORE-0000000001", (Object)this.builder.newStoreName("Y-"));
        Assertions.assertEquals((Object)"Z-STATE-STORE-0000000002", (Object)this.builder.newStoreName("Z-"));
        InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
        Assertions.assertEquals((Object)"X-STATE-STORE-0000000000", (Object)newBuilder.newStoreName("X-"));
        Assertions.assertEquals((Object)"Y-STATE-STORE-0000000001", (Object)newBuilder.newStoreName("Y-"));
        Assertions.assertEquals((Object)"Z-STATE-STORE-0000000002", (Object)newBuilder.newStoreName("Z-"));
    }

    @Test
    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        String topic3 = "topic-3";
        KStream source1 = this.builder.stream(Collections.singleton("topic-1"), this.consumed);
        KStream source2 = this.builder.stream(Collections.singleton("topic-2"), this.consumed);
        KStream source3 = this.builder.stream(Collections.singleton("topic-3"), this.consumed);
        KStream processedSource1 = source1.mapValues(v -> v).filter((k, v) -> true);
        KStream processedSource2 = source2.filter((k, v) -> true);
        KStream merged = processedSource1.merge(processedSource2).merge(source3);
        merged.groupByKey().count(Materialized.as((String)"my-table"));
        this.builder.buildAndOptimizeTopology();
        Map actual = this.builder.internalTopologyBuilder.stateStoreNameToFullSourceTopicNames();
        Assertions.assertEquals(Arrays.asList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
    }

    @Test
    public void shouldNotMaterializeSourceKTableIfNotRequired() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.with(null, null), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("topic2", this.consumed, materializedInternal);
        this.builder.buildAndOptimizeTopology();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig(APP_ID))).buildTopology();
        Assertions.assertEquals((int)0, (int)topology.stateStores().size());
        Assertions.assertEquals((int)0, (int)topology.storeToChangelogTopic().size());
        Assertions.assertNull((Object)table1.queryableStoreName());
    }

    @Test
    public void shouldBuildGlobalTableWithNonQueryableStoreName() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.with(null, null), (InternalNameProvider)this.builder, "prefix-");
        GlobalKTable table1 = this.builder.globalTable("topic2", this.consumed, materializedInternal);
        Assertions.assertNull((Object)table1.queryableStoreName());
    }

    @Test
    public void shouldBuildGlobalTableWithQueryaIbleStoreName() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as((String)"globalTable"), (InternalNameProvider)this.builder, "prefix-");
        GlobalKTable table1 = this.builder.globalTable("topic2", this.consumed, materializedInternal);
        Assertions.assertEquals((Object)"globalTable", (Object)table1.queryableStoreName());
    }

    @Test
    public void shouldBuildSimpleGlobalTableTopology() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as((String)"globalTable"), (InternalNameProvider)this.builder, "prefix-");
        this.builder.globalTable("table", this.consumed, materializedInternal);
        this.builder.buildAndOptimizeTopology();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig(APP_ID))).buildGlobalStateTopology();
        List stateStores = topology.globalStateStores();
        Assertions.assertEquals((int)1, (int)stateStores.size());
        Assertions.assertEquals((Object)"globalTable", (Object)((StateStore)stateStores.get(0)).name());
    }

    @Test
    public void shouldThrowOnVersionedStoreSupplierForGlobalTable() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"store", (Duration)Duration.ZERO)), (InternalNameProvider)this.builder, "prefix-");
        Assertions.assertThrows(TopologyException.class, () -> this.builder.globalTable("table", this.consumed, materializedInternal));
    }

    private void doBuildGlobalTopologyWithAllGlobalTables() {
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig(APP_ID))).buildGlobalStateTopology();
        List stateStores = topology.globalStateStores();
        Set sourceTopics = topology.sourceTopics();
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"table", "table2"}), (Object)sourceTopics);
        Assertions.assertEquals((int)2, (int)stateStores.size());
    }

    @Test
    public void shouldBuildGlobalTopologyWithAllGlobalTables() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as((String)"global1"), (InternalNameProvider)this.builder, "prefix-");
        this.builder.globalTable("table", this.consumed, materializedInternal);
        materializedInternal = new MaterializedInternal(Materialized.as((String)"global2"), (InternalNameProvider)this.builder, "prefix-");
        this.builder.globalTable("table2", this.consumed, materializedInternal);
        this.builder.buildAndOptimizeTopology();
        this.doBuildGlobalTopologyWithAllGlobalTables();
    }

    @Test
    public void shouldAddGlobalTablesToEachGroup() {
        String one = "globalTable";
        String two = "globalTable2";
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as((String)"globalTable"), (InternalNameProvider)this.builder, "prefix-");
        GlobalKTable globalTable = this.builder.globalTable("table", this.consumed, materializedInternal);
        MaterializedInternal materializedInternal2 = new MaterializedInternal(Materialized.as((String)"globalTable2"), (InternalNameProvider)this.builder, "prefix-");
        GlobalKTable globalTable2 = this.builder.globalTable("table2", this.consumed, materializedInternal2);
        MaterializedInternal materializedInternalNotGlobal = new MaterializedInternal(Materialized.as((String)"not-global"), (InternalNameProvider)this.builder, "prefix-");
        this.builder.table("not-global", this.consumed, materializedInternalNotGlobal);
        KeyValueMapper kvMapper = (key, value) -> value;
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
        KStream stream2 = this.builder.stream(Collections.singleton("t2"), this.consumed);
        stream2.leftJoin(globalTable2, kvMapper, MockValueJoiner.TOSTRING_JOINER);
        Map nodeGroups = this.builder.internalTopologyBuilder.nodeGroups();
        for (Integer groupId : nodeGroups.keySet()) {
            ProcessorTopology topology = this.builder.internalTopologyBuilder.buildSubtopology(groupId.intValue());
            List stateStores = topology.globalStateStores();
            HashSet<String> names = new HashSet<String>();
            for (StateStore stateStore : stateStores) {
                names.add(stateStore.name());
            }
            Assertions.assertEquals((int)2, (int)stateStores.size());
            Assertions.assertTrue((boolean)names.contains("globalTable"));
            Assertions.assertTrue((boolean)names.contains("globalTable2"));
        }
    }

    @Test
    public void shouldMapStateStoresToCorrectSourceTopics() {
        KStream playEvents = this.builder.stream(Collections.singleton("events"), this.consumed);
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as((String)"table-store"), (InternalNameProvider)this.builder, "prefix-");
        KTable table = this.builder.table("table-topic", this.consumed, materializedInternal);
        KStream mapped = playEvents.map(MockMapper.selectValueKeyValueMapper());
        mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.as((String)"count"));
        this.builder.buildAndOptimizeTopology();
        this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig(APP_ID)));
        Assertions.assertEquals(Collections.singletonList("table-topic"), (Object)this.builder.internalTopologyBuilder.sourceTopicsForStore("table-store"));
        Assertions.assertEquals(Collections.singletonList("app-id-KSTREAM-MAP-0000000003-repartition"), (Object)this.builder.internalTopologyBuilder.sourceTopicsForStore("count"));
    }

    @Test
    public void shouldAddTopicToEarliestAutoOffsetResetList() {
        String topicName = "topic-1";
        ConsumedInternal consumed = new ConsumedInternal(Consumed.with((Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST));
        this.builder.stream(Collections.singleton("topic-1"), consumed);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat((Object)this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), (Matcher)CoreMatchers.equalTo((Object)OffsetResetStrategy.EARLIEST));
    }

    @Test
    public void shouldAddTopicToLatestAutoOffsetResetList() {
        String topicName = "topic-1";
        ConsumedInternal consumed = new ConsumedInternal(Consumed.with((Topology.AutoOffsetReset)Topology.AutoOffsetReset.LATEST));
        this.builder.stream(Collections.singleton("topic-1"), consumed);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat((Object)this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), (Matcher)CoreMatchers.equalTo((Object)OffsetResetStrategy.LATEST));
    }

    @Test
    public void shouldAddTableToEarliestAutoOffsetResetList() {
        String topicName = "topic-1";
        this.builder.table("topic-1", new ConsumedInternal(Consumed.with((Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST)), this.materialized);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat((Object)this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), (Matcher)CoreMatchers.equalTo((Object)OffsetResetStrategy.EARLIEST));
    }

    @Test
    public void shouldAddTableToLatestAutoOffsetResetList() {
        String topicName = "topic-1";
        this.builder.table("topic-1", new ConsumedInternal(Consumed.with((Topology.AutoOffsetReset)Topology.AutoOffsetReset.LATEST)), this.materialized);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat((Object)this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), (Matcher)CoreMatchers.equalTo((Object)OffsetResetStrategy.LATEST));
    }

    @Test
    public void shouldNotAddTableToOffsetResetLists() {
        String topicName = "topic-1";
        this.builder.table("topic-1", this.consumed, this.materialized);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat((Object)this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1"), (Matcher)CoreMatchers.equalTo((Object)OffsetResetStrategy.NONE));
    }

    @Test
    public void shouldNotAddRegexTopicsToOffsetResetLists() {
        Pattern topicPattern = Pattern.compile("topic-\\d");
        String topic = "topic-5";
        this.builder.stream(topicPattern, this.consumed);
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat((Object)this.builder.internalTopologyBuilder.offsetResetStrategy("topic-5"), (Matcher)CoreMatchers.equalTo((Object)OffsetResetStrategy.NONE));
    }

    @Test
    public void shouldAddRegexTopicToEarliestAutoOffsetResetList() {
        Pattern topicPattern = Pattern.compile("topic-\\d+");
        String topicTwo = "topic-500000";
        this.builder.stream(topicPattern, new ConsumedInternal(Consumed.with((Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST)));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat((Object)this.builder.internalTopologyBuilder.offsetResetStrategy("topic-500000"), (Matcher)CoreMatchers.equalTo((Object)OffsetResetStrategy.EARLIEST));
    }

    @Test
    public void shouldAddRegexTopicToLatestAutoOffsetResetList() {
        Pattern topicPattern = Pattern.compile("topic-\\d+");
        String topicTwo = "topic-1000000";
        this.builder.stream(topicPattern, new ConsumedInternal(Consumed.with((Topology.AutoOffsetReset)Topology.AutoOffsetReset.LATEST)));
        this.builder.buildAndOptimizeTopology();
        MatcherAssert.assertThat((Object)this.builder.internalTopologyBuilder.offsetResetStrategy("topic-1000000"), (Matcher)CoreMatchers.equalTo((Object)OffsetResetStrategy.LATEST));
    }

    @Test
    public void shouldHaveNullTimestampExtractorWhenNoneSupplied() {
        this.builder.stream(Collections.singleton("topic"), this.consumed);
        this.builder.buildAndOptimizeTopology();
        this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig(APP_ID)));
        ProcessorTopology processorTopology = this.builder.internalTopologyBuilder.buildTopology();
        Assertions.assertNull((Object)processorTopology.source("topic").getTimestampExtractor());
    }

    @Test
    public void shouldUseProvidedTimestampExtractor() {
        ConsumedInternal consumed = new ConsumedInternal(Consumed.with((TimestampExtractor)new MockTimestampExtractor()));
        this.builder.stream(Collections.singleton("topic"), consumed);
        this.builder.buildAndOptimizeTopology();
        ProcessorTopology processorTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig(APP_ID))).buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.source("topic").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() {
        this.builder.table("topic", this.consumed, this.materialized);
        this.builder.buildAndOptimizeTopology();
        ProcessorTopology processorTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig(APP_ID))).buildTopology();
        Assertions.assertNull((Object)processorTopology.source("topic").getTimestampExtractor());
    }

    @Test
    public void ktableShouldUseProvidedTimestampExtractor() {
        ConsumedInternal consumed = new ConsumedInternal(Consumed.with((TimestampExtractor)new MockTimestampExtractor()));
        this.builder.table("topic", consumed, this.materialized);
        this.builder.buildAndOptimizeTopology();
        ProcessorTopology processorTopology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig(APP_ID))).buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.source("topic").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(MockTimestampExtractor.class));
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinSingleStream() {
        this.props.put("topology.optimization", "all");
        KStream stream = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream.join(stream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        GraphNode join = this.getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)join).getSelfJoin());
        GraphNode parent = (GraphNode)join.parentNodes().stream().findFirst().get();
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)1);
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinTwoStreams() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        GraphNode join = this.getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)join).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)1);
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinMergeTwoStreams() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t2"), this.consumed);
        KStream stream3 = stream1.merge(stream2);
        stream3.join(stream3, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        GraphNode join = this.getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)join).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)1);
    }

    @Test
    public void shouldMarkFirstStreamStreamJoinAsSelfJoin3WayJoin() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream3 = this.builder.stream(Collections.singleton("t3"), this.consumed);
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L))).join(stream3, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList<GraphNode> result = new ArrayList<GraphNode>();
        this.getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>(), result);
        Assertions.assertEquals((int)result.size(), (int)2);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)result.get(0)).getSelfJoin());
        Assertions.assertFalse((boolean)((StreamStreamJoinNode)result.get(1)).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)3);
    }

    @Test
    public void shouldMarkAllStreamStreamJoinsAsSelfJoin() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream3 = this.builder.stream(Collections.singleton("t2"), this.consumed);
        KStream stream4 = this.builder.stream(Collections.singleton("t2"), this.consumed);
        KStream firstResult = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        KStream secondResult = stream3.join(stream4, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        firstResult.merge(secondResult);
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList<GraphNode> result = new ArrayList<GraphNode>();
        this.getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>(), result);
        Assertions.assertEquals((int)result.size(), (int)2);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)result.get(0)).getSelfJoin());
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)result.get(1)).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)2);
    }

    @Test
    public void shouldMarkFirstStreamStreamJoinAsSelfJoinNwaySameSource() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream3 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L))).join(stream3, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList<GraphNode> joinNodes = new ArrayList<GraphNode>();
        this.getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>(), joinNodes);
        Assertions.assertEquals((int)joinNodes.size(), (int)2);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)joinNodes.get(0)).getSelfJoin());
        Assertions.assertFalse((boolean)((StreamStreamJoinNode)joinNodes.get(1)).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)3);
    }

    @Test
    public void shouldMarkFirstStreamStreamJoinAsSelfJoinNway() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream3 = this.builder.stream(Collections.singleton("t2"), this.consumed);
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L))).join(stream3, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList<GraphNode> joinNodes = new ArrayList<GraphNode>();
        this.getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>(), joinNodes);
        Assertions.assertEquals((int)joinNodes.size(), (int)2);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)joinNodes.get(0)).getSelfJoin());
        Assertions.assertFalse((boolean)((StreamStreamJoinNode)joinNodes.get(1)).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)3);
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinTwoStreamsWithNoOpFilter() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream1.filter((key, value) -> value != null);
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        GraphNode join = this.getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)join).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)1);
    }

    @Test
    public void shouldMarkStreamStreamJoinAsSelfJoinTwoJoinsSameSource() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream3 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream4 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        stream3.join(stream4, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        ArrayList<GraphNode> joinNodes = new ArrayList<GraphNode>();
        this.getNodesByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>(), joinNodes);
        Assertions.assertEquals((int)joinNodes.size(), (int)2);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)joinNodes.get(0)).getSelfJoin());
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)joinNodes.get(1)).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)2);
    }

    @Test
    public void shouldNotMarkStreamStreamJoinAsSelfJoinTwoStreamsWithFilter() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream1.filter((key, value) -> value != null).join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        GraphNode join = this.getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        Assertions.assertFalse((boolean)((StreamStreamJoinNode)join).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)2);
    }

    @Test
    public void shouldNotMarkStreamStreamJoinAsSelfJoinOneStreamWithMap() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream3 = stream1.mapValues(v -> v);
        KStream stream2 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream3.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        GraphNode join = this.getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        Assertions.assertFalse((boolean)((StreamStreamJoinNode)join).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)2);
    }

    @Test
    public void shouldNotMarkStreamStreamJoinAsSelfJoinMultipleSources() {
        this.props.put("topology.optimization", "all");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        KStream stream2 = this.builder.stream(Collections.singleton("t2"), this.consumed);
        stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        GraphNode join = this.getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        Assertions.assertFalse((boolean)((StreamStreamJoinNode)join).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)2);
    }

    @Test
    public void shouldOptimizeJoinWhenInConfig() {
        this.props.put("topology.optimization", "single.store.self.join");
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream1.join(stream1, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        GraphNode join = this.getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        Assertions.assertTrue((boolean)((StreamStreamJoinNode)join).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)1);
    }

    @Test
    public void shouldNotOptimizeJoinWhenNotInConfig() {
        String value = String.join((CharSequence)",", "reuse.ktable.source.topics", "merge.repartition.topics");
        this.props.put("topology.optimization", value);
        KStream stream1 = this.builder.stream(Collections.singleton("t1"), this.consumed);
        stream1.join(stream1, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)));
        this.builder.buildAndOptimizeTopology(this.props);
        GraphNode join = this.getNodeByType(this.builder.root, StreamStreamJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        Assertions.assertFalse((boolean)((StreamStreamJoinNode)join).getSelfJoin());
        AtomicInteger count = new AtomicInteger();
        this.countJoinWindowNodes(count, this.builder.root, new HashSet<GraphNode>());
        Assertions.assertEquals((int)count.get(), (int)2);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableFilter() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"store", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, materializedInternal);
        table1.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = table1.mapValues(v -> v != null ? v + v : null);
        table2.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal unversionedMaterialize = new MaterializedInternal(Materialized.as((String)"unversioned"), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = table1.mapValues(v -> v != null ? v + v : null, (Materialized)unversionedMaterialize);
        table2.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize2 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned2", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = table1.mapValues(v -> v != null ? v + v : null, (Materialized)versionedMaterialize2);
        table2.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = table1.groupBy(KeyValue::new).count();
        table2.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize2 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned2", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = table1.groupBy(KeyValue::new).count((Materialized)versionedMaterialize2);
        table2.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithIntermediateJoin() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize2 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned2", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, versionedMaterialize2);
        KTable table3 = table1.join(table2, (v1, v2) -> v1 + v2);
        table3.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateJoinMaterializedAsVersioned() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize2 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned2", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize3 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned3", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, versionedMaterialize2);
        KTable table3 = table1.join(table2, (v1, v2) -> v1 + v2, (Materialized)versionedMaterialize3);
        table3.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithIntermediateForeignKeyJoin() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize2 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned2", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, versionedMaterialize2);
        KTable table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
        table3.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithIntermediateForeignKeyJoinMaterializedAsVersioned() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize2 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned2", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize3 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned3", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, versionedMaterialize2);
        KTable table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2, (Materialized)versionedMaterialize3);
        table3.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, true);
    }

    @Test
    public void shouldNotSetUseVersionedSemanticsWithToStreamAndBack() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = table1.toStream().toTable();
        table2.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsWithToStreamAndBackIfMaterializedAsVersioned() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize2 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned2", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = table1.toStream().toTable((Materialized)versionedMaterialize2);
        table2.filter((k, v) -> v != null);
        this.builder.buildAndOptimizeTopology();
        GraphNode filter = this.getNodeByType(this.builder.root, TableFilterNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)filter);
        this.verifyVersionedSemantics((TableFilterNode)filter, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableRepartitionMap() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        table1.groupBy(KeyValue::new).count();
        this.builder.buildAndOptimizeTopology();
        GraphNode repartitionMap = this.getNodeByType(this.builder.root, TableRepartitionMapNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)repartitionMap);
        this.verifyVersionedSemantics((TableRepartitionMapNode)repartitionMap, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableRepartitionMapWithIntermediateNodes() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = table1.filter((k, v) -> v != null).mapValues(v -> v + v);
        table2.groupBy(KeyValue::new).count();
        this.builder.buildAndOptimizeTopology();
        GraphNode repartitionMap = this.getNodeByType(this.builder.root, TableRepartitionMapNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)repartitionMap);
        this.verifyVersionedSemantics((TableRepartitionMapNode)repartitionMap, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableJoin() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize2 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned2", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, versionedMaterialize2);
        table1.join(table2, (v1, v2) -> v1 + v2);
        this.builder.buildAndOptimizeTopology();
        GraphNode join = this.getNodeByType(this.builder.root, KTableKTableJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        this.verifyVersionedSemantics((KTableKTableJoinNode)join, true, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableJoinLeftOnly() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal unversionedMaterialize = new MaterializedInternal(Materialized.as((String)"unversioned"), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, unversionedMaterialize);
        table1.join(table2, (v1, v2) -> v1 + v2);
        this.builder.buildAndOptimizeTopology();
        GraphNode join = this.getNodeByType(this.builder.root, KTableKTableJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        this.verifyVersionedSemantics((KTableKTableJoinNode)join, true, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableJoinRightOnly() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal unversionedMaterialize = new MaterializedInternal(Materialized.as((String)"unversioned"), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, unversionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, versionedMaterialize);
        table1.join(table2, (v1, v2) -> v1 + v2);
        this.builder.buildAndOptimizeTopology();
        GraphNode join = this.getNodeByType(this.builder.root, KTableKTableJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        this.verifyVersionedSemantics((KTableKTableJoinNode)join, false, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableSelfJoin() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        table1.join(table1, (v1, v2) -> v1 + v2);
        this.builder.buildAndOptimizeTopology();
        GraphNode join = this.getNodeByType(this.builder.root, KTableKTableJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)join);
        this.verifyVersionedSemantics((KTableKTableJoinNode)join, true, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableForeignJoin() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal versionedMaterialize2 = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned2", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, versionedMaterialize2);
        table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
        this.builder.buildAndOptimizeTopology();
        GraphNode joinThis = this.getNodeByType(this.builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)joinThis);
        this.verifyVersionedSemantics((ForeignJoinSubscriptionSendNode)joinThis, true);
        GraphNode joinOther = this.getNodeByType(this.builder.root, ForeignTableJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)joinOther);
        this.verifyVersionedSemantics((ForeignTableJoinNode)joinOther, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableForeignJoinLeftOnly() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal unversionedMaterialize = new MaterializedInternal(Materialized.as((String)"unversioned"), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, unversionedMaterialize);
        table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
        this.builder.buildAndOptimizeTopology();
        GraphNode joinThis = this.getNodeByType(this.builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)joinThis);
        this.verifyVersionedSemantics((ForeignJoinSubscriptionSendNode)joinThis, true);
        GraphNode joinOther = this.getNodeByType(this.builder.root, ForeignTableJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)joinOther);
        this.verifyVersionedSemantics((ForeignTableJoinNode)joinOther, false);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableForeignJoinRightOnly() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        MaterializedInternal unversionedMaterialize = new MaterializedInternal(Materialized.as((String)"unversioned"), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, unversionedMaterialize);
        KTable table2 = this.builder.table("t2", this.consumed, versionedMaterialize);
        table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
        this.builder.buildAndOptimizeTopology();
        GraphNode joinThis = this.getNodeByType(this.builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)joinThis);
        this.verifyVersionedSemantics((ForeignJoinSubscriptionSendNode)joinThis, false);
        GraphNode joinOther = this.getNodeByType(this.builder.root, ForeignTableJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)joinOther);
        this.verifyVersionedSemantics((ForeignTableJoinNode)joinOther, true);
    }

    @Test
    public void shouldSetUseVersionedSemanticsOnTableForeignSelfJoin() {
        MaterializedInternal versionedMaterialize = new MaterializedInternal(Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)"versioned", (Duration)Duration.ofMinutes(5L))), (InternalNameProvider)this.builder, "prefix-");
        KTable table1 = this.builder.table("t1", this.consumed, versionedMaterialize);
        table1.join(table1, v -> v, (v1, v2) -> v1 + v2);
        this.builder.buildAndOptimizeTopology();
        GraphNode joinThis = this.getNodeByType(this.builder.root, ForeignJoinSubscriptionSendNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)joinThis);
        this.verifyVersionedSemantics((ForeignJoinSubscriptionSendNode)joinThis, true);
        GraphNode joinOther = this.getNodeByType(this.builder.root, ForeignTableJoinNode.class, new HashSet<GraphNode>());
        Assertions.assertNotNull((Object)joinOther);
        this.verifyVersionedSemantics((ForeignTableJoinNode)joinOther, true);
    }

    private void verifyVersionedSemantics(TableFilterNode<?, ?> filterNode, boolean expectedValue) {
        ProcessorSupplier processorSupplier = filterNode.processorParameters().processorSupplier();
        Assertions.assertInstanceOf(KTableFilter.class, (Object)processorSupplier);
        KTableFilter tableFilter = (KTableFilter)processorSupplier;
        Assertions.assertEquals((Object)expectedValue, (Object)tableFilter.isUseVersionedSemantics());
    }

    private void verifyVersionedSemantics(TableRepartitionMapNode<?, ?> repartitionMapNode, boolean expectedValue) {
        ProcessorSupplier processorSupplier = repartitionMapNode.processorParameters().processorSupplier();
        Assertions.assertInstanceOf(KTableRepartitionMap.class, (Object)processorSupplier);
        KTableRepartitionMap repartitionMap = (KTableRepartitionMap)processorSupplier;
        Assertions.assertEquals((Object)expectedValue, (Object)repartitionMap.isUseVersionedSemantics());
    }

    private void verifyVersionedSemantics(KTableKTableJoinNode<?, ?, ?, ?> joinNode, boolean expectedValueLeft, boolean expectedValueRight) {
        ProcessorSupplier thisProcessorSupplier = joinNode.thisProcessorParameters().processorSupplier();
        Assertions.assertInstanceOf(KTableKTableAbstractJoin.class, (Object)thisProcessorSupplier);
        KTableKTableAbstractJoin thisJoin = (KTableKTableAbstractJoin)thisProcessorSupplier;
        Assertions.assertEquals((Object)expectedValueLeft, (Object)thisJoin.isUseVersionedSemantics());
        ProcessorSupplier otherProcessorSupplier = joinNode.otherProcessorParameters().processorSupplier();
        Assertions.assertInstanceOf(KTableKTableAbstractJoin.class, (Object)otherProcessorSupplier);
        KTableKTableAbstractJoin otherJoin = (KTableKTableAbstractJoin)otherProcessorSupplier;
        Assertions.assertEquals((Object)expectedValueRight, (Object)otherJoin.isUseVersionedSemantics());
    }

    private void verifyVersionedSemantics(ForeignJoinSubscriptionSendNode<?, ?> joinThisNode, boolean expectedValue) {
        ProcessorSupplier thisProcessorSupplier = joinThisNode.processorParameters().processorSupplier();
        Assertions.assertInstanceOf(SubscriptionSendProcessorSupplier.class, (Object)thisProcessorSupplier);
        SubscriptionSendProcessorSupplier joinThis = (SubscriptionSendProcessorSupplier)thisProcessorSupplier;
        Assertions.assertEquals((Object)expectedValue, (Object)joinThis.isUseVersionedSemantics());
    }

    private void verifyVersionedSemantics(ForeignTableJoinNode<?, ?> joinOtherNode, boolean expectedValue) {
        ProcessorSupplier otherProcessorSupplier = joinOtherNode.processorParameters().processorSupplier();
        Assertions.assertInstanceOf(ForeignTableJoinProcessorSupplier.class, (Object)otherProcessorSupplier);
        ForeignTableJoinProcessorSupplier joinThis = (ForeignTableJoinProcessorSupplier)otherProcessorSupplier;
        Assertions.assertEquals((Object)expectedValue, (Object)joinThis.isUseVersionedSemantics());
    }

    private GraphNode getNodeByType(GraphNode currentNode, Class<? extends GraphNode> clazz, Set<GraphNode> visited) {
        if (clazz.isAssignableFrom(currentNode.getClass())) {
            return currentNode;
        }
        for (GraphNode child : currentNode.children()) {
            GraphNode result;
            if (!visited.contains(child)) {
                visited.add(child);
            }
            if ((result = this.getNodeByType(child, clazz, visited)) == null) continue;
            return result;
        }
        return null;
    }

    private void getNodesByType(GraphNode currentNode, Class<? extends GraphNode> clazz, Set<GraphNode> visited, List<GraphNode> result) {
        if (clazz.isAssignableFrom(currentNode.getClass())) {
            result.add(currentNode);
        }
        for (GraphNode child : currentNode.children()) {
            if (visited.contains(child)) continue;
            visited.add(child);
            this.getNodesByType(child, clazz, visited, result);
        }
    }

    private void countJoinWindowNodes(AtomicInteger count, GraphNode currentNode, Set<GraphNode> visited) {
        if (currentNode instanceof WindowedStreamProcessorNode) {
            count.incrementAndGet();
        }
        for (GraphNode child : currentNode.children()) {
            if (visited.contains(child)) continue;
            visited.add(child);
            this.countJoinWindowNodes(count, child, visited);
        }
    }
}

