/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.graph;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class StreamsGraphTest {
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private Initializer<String> initializer;
    private Aggregator<String, String, String> aggregator;
    private final String expectedJoinedTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n";
    private final String expectedJoinedFilteredTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> none\n      <-- KSTREAM-MERGE-0000000006\n\n";
    private final String expectedFullTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000008\n      <-- KSTREAM-MERGE-0000000006\n    Processor: KSTREAM-MAPVALUES-0000000008 (stores: [])\n      --> KSTREAM-SINK-0000000009\n      <-- KSTREAM-FILTER-0000000007\n    Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n      <-- KSTREAM-MAPVALUES-0000000008\n\n";
    private final String expectedMergeOptimizedTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])\n      --> KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, KSTREAM-MAPVALUES-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n      --> KSTREAM-MERGE-0000000005\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-MERGE-0000000005\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MERGE-0000000005 (stores: [])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-SINK-0000000007\n      <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004\n    Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n      <-- KSTREAM-MERGE-0000000006\n\n";
    private final String expectedComplexMergeOptimizeTopology = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n      --> KSTREAM-TRANSFORM-0000000001\n    Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink\n      <-- KSTREAM-TRANSFORM-0000000001\n    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> KTABLE-SUPPRESS-0000000007\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source\n    Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n      --> KSTREAM-PEEK-0000000020\n    Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n      --> KTABLE-TOSTREAM-0000000009\n      <-- KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-PEEK-0000000020 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000021\n      <-- KSTREAM-SOURCE-0000000019\n    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n      --> KSTREAM-FLATMAP-0000000010\n      <-- KTABLE-SUPPRESS-0000000007\n    Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n      --> KSTREAM-MERGE-0000000022\n      <-- KTABLE-TOSTREAM-0000000009\n    Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n      --> KSTREAM-MERGE-0000000022\n      <-- KSTREAM-PEEK-0000000020\n    Processor: KSTREAM-MERGE-0000000022 (stores: [])\n      --> KSTREAM-FILTER-0000000024\n      <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n    Processor: KSTREAM-FILTER-0000000024 (stores: [])\n      --> KSTREAM-SINK-0000000023\n      <-- KSTREAM-MERGE-0000000022\n    Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n      <-- KSTREAM-FILTER-0000000024\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n      --> KSTREAM-FLATMAP-0000000012\n    Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n      <-- KSTREAM-SOURCE-0000000011\n    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink\n      <-- KSTREAM-FLATMAP-0000000012\n    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n\n  Sub-topology: 3\n    Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n      --> KSTREAM-LEFTJOIN-0000000026\n    Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n      --> KSTREAM-BRANCH-0000000027\n      <-- KSTREAM-SOURCE-0000000025\n    Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n      --> KSTREAM-BRANCH-00000000270, KSTREAM-BRANCH-00000000271\n      <-- KSTREAM-LEFTJOIN-0000000026\n    Processor: KSTREAM-BRANCH-00000000270 (stores: [])\n      --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n      <-- KSTREAM-BRANCH-0000000027\n    Processor: KSTREAM-BRANCH-00000000271 (stores: [])\n      --> KSTREAM-MAP-0000000029\n      <-- KSTREAM-BRANCH-0000000027\n    Processor: KSTREAM-FILTER-0000000033 (stores: [])\n      --> KSTREAM-PEEK-0000000034\n      <-- KSTREAM-BRANCH-00000000270\n    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n      --> KSTREAM-PEEK-0000000013\n    Processor: KSTREAM-MAP-0000000029 (stores: [])\n      --> KSTREAM-PEEK-0000000030\n      <-- KSTREAM-BRANCH-00000000271\n    Processor: KSTREAM-PEEK-0000000034 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000035\n      <-- KSTREAM-FILTER-0000000033\n    Processor: KSTREAM-MAP-0000000037 (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- KSTREAM-BRANCH-00000000270\n    Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n      --> KSTREAM-SINK-0000000036\n      <-- KSTREAM-PEEK-0000000034\n    Processor: KSTREAM-PEEK-0000000013 (stores: [])\n      --> KSTREAM-AGGREGATE-0000000015\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source\n    Processor: KSTREAM-PEEK-0000000030 (stores: [])\n      --> KSTREAM-SINK-0000000031\n      <-- KSTREAM-MAP-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n      --> none\n      <-- KSTREAM-PEEK-0000000013\n    Sink: KSTREAM-SINK-0000000031 (topic: external-command)\n      <-- KSTREAM-PEEK-0000000030\n    Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n      <-- KSTREAM-MAPVALUES-0000000035\n    Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n      <-- KSTREAM-MAP-0000000037\n\n";

    @Test
    public void shouldBeAbleToBuildTopologyIncrementally() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream("topic");
        KStream streamII = builder.stream("other-topic");
        ValueJoiner valueJoiner = (v, v2) -> v + v2;
        KStream joinedStream = stream.join(streamII, valueJoiner, JoinWindows.of((Duration)Duration.ofMillis(5000L)));
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> none\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n", (Object)builder.build().describe().toString());
        KStream filteredJoinStream = joinedStream.filter((k, v) -> v.equals("foo"));
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> none\n      <-- KSTREAM-MERGE-0000000006\n\n", (Object)builder.build().describe().toString());
        filteredJoinStream.mapValues(v -> v + "some value").to("output-topic");
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> KSTREAM-WINDOWED-0000000002\n    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n      --> KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-JOINTHIS-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-JOINOTHER-0000000005\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000003\n    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-WINDOWED-0000000002\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-FILTER-0000000007\n      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000008\n      <-- KSTREAM-MERGE-0000000006\n    Processor: KSTREAM-MAPVALUES-0000000008 (stores: [])\n      --> KSTREAM-SINK-0000000009\n      <-- KSTREAM-FILTER-0000000007\n    Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n      <-- KSTREAM-MAPVALUES-0000000008\n\n", (Object)builder.build().describe().toString());
    }

    @Test
    public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "test-application");
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("topology.optimization", "all");
        StreamsBuilder builder = new StreamsBuilder();
        KStream inputStream = builder.stream("inputTopic");
        KStream changedKeyStream = inputStream.selectKey((k, v) -> v.substring(0, 5));
        changedKeyStream.groupByKey(Grouped.as((String)"count-repartition")).count(Materialized.as((String)"count-store")).toStream().to("count-topic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        changedKeyStream.groupByKey(Grouped.as((String)"windowed-repartition")).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofSeconds(5L))).count(Materialized.as((String)"windowed-count-store")).toStream().map((k, v) -> KeyValue.pair((Object)k.key(), (Object)v)).to("windowed-count", Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        builder.build(properties);
    }

    @Test
    public void shouldNotThrowNPEWithMergeNodes() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "test-application");
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("topology.optimization", "all");
        StreamsBuilder builder = new StreamsBuilder();
        this.initializer = () -> "";
        this.aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
        TransformerSupplier transformSupplier = () -> new Transformer<String, String, KeyValue<String, String>>(){

            public void init(ProcessorContext context) {
            }

            public KeyValue<String, String> transform(String key, String value) {
                return KeyValue.pair((Object)key, (Object)value);
            }

            public void close() {
            }
        };
        KStream retryStream = builder.stream("retryTopic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).transform(transformSupplier, new String[0]).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).aggregate(this.initializer, this.aggregator, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).suppress(Suppressed.untilTimeLimit((Duration)Duration.ofSeconds(500L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)64000000L))).toStream().flatMap((k, v) -> new ArrayList());
        KTable idTable = builder.stream("id-table-topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).flatMap((k, v) -> new ArrayList()).peek((subscriptionId, recipientId) -> System.out.println("data " + subscriptionId + " " + recipientId)).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).aggregate(this.initializer, this.aggregator, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream joinStream = builder.stream("internal-topic-command", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).peek((subscriptionId, command) -> System.out.println("stdoutput")).mapValues((k, v) -> v).merge(retryStream).leftJoin(idTable, (v1, v2) -> v1 + v2, Joined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joinStream.split().branch((k, v) -> v.equals("some-value"), Branched.withConsumer(ks -> ks.map(KeyValue::pair).peek((recipientId, command) -> System.out.println("printing out")).to("external-command", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String())))).defaultBranch(Branched.withConsumer(ks -> {
            ks.filter((k, v) -> v != null).peek((subscriptionId, wrapper) -> System.out.println("Printing output")).mapValues((k, v) -> v).to("dlq-topic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
            ks.map(KeyValue::pair).to("retryTopic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        }));
        Topology topology = builder.build(properties);
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n      --> KSTREAM-TRANSFORM-0000000001\n    Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink\n      <-- KSTREAM-TRANSFORM-0000000001\n    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n      --> KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> KTABLE-SUPPRESS-0000000007\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source\n    Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n      --> KSTREAM-PEEK-0000000020\n    Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n      --> KTABLE-TOSTREAM-0000000009\n      <-- KSTREAM-AGGREGATE-0000000003\n    Processor: KSTREAM-PEEK-0000000020 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000021\n      <-- KSTREAM-SOURCE-0000000019\n    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n      --> KSTREAM-FLATMAP-0000000010\n      <-- KTABLE-SUPPRESS-0000000007\n    Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n      --> KSTREAM-MERGE-0000000022\n      <-- KTABLE-TOSTREAM-0000000009\n    Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n      --> KSTREAM-MERGE-0000000022\n      <-- KSTREAM-PEEK-0000000020\n    Processor: KSTREAM-MERGE-0000000022 (stores: [])\n      --> KSTREAM-FILTER-0000000024\n      <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n    Processor: KSTREAM-FILTER-0000000024 (stores: [])\n      --> KSTREAM-SINK-0000000023\n      <-- KSTREAM-MERGE-0000000022\n    Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n      <-- KSTREAM-FILTER-0000000024\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n      --> KSTREAM-FLATMAP-0000000012\n    Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n      <-- KSTREAM-SOURCE-0000000011\n    Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter (stores: [])\n      --> KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink\n      <-- KSTREAM-FLATMAP-0000000012\n    Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n\n  Sub-topology: 3\n    Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n      --> KSTREAM-LEFTJOIN-0000000026\n    Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n      --> KSTREAM-BRANCH-0000000027\n      <-- KSTREAM-SOURCE-0000000025\n    Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n      --> KSTREAM-BRANCH-00000000270, KSTREAM-BRANCH-00000000271\n      <-- KSTREAM-LEFTJOIN-0000000026\n    Processor: KSTREAM-BRANCH-00000000270 (stores: [])\n      --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n      <-- KSTREAM-BRANCH-0000000027\n    Processor: KSTREAM-BRANCH-00000000271 (stores: [])\n      --> KSTREAM-MAP-0000000029\n      <-- KSTREAM-BRANCH-0000000027\n    Processor: KSTREAM-FILTER-0000000033 (stores: [])\n      --> KSTREAM-PEEK-0000000034\n      <-- KSTREAM-BRANCH-00000000270\n    Source: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n      --> KSTREAM-PEEK-0000000013\n    Processor: KSTREAM-MAP-0000000029 (stores: [])\n      --> KSTREAM-PEEK-0000000030\n      <-- KSTREAM-BRANCH-00000000271\n    Processor: KSTREAM-PEEK-0000000034 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000035\n      <-- KSTREAM-FILTER-0000000033\n    Processor: KSTREAM-MAP-0000000037 (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- KSTREAM-BRANCH-00000000270\n    Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n      --> KSTREAM-SINK-0000000036\n      <-- KSTREAM-PEEK-0000000034\n    Processor: KSTREAM-PEEK-0000000013 (stores: [])\n      --> KSTREAM-AGGREGATE-0000000015\n      <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source\n    Processor: KSTREAM-PEEK-0000000030 (stores: [])\n      --> KSTREAM-SINK-0000000031\n      <-- KSTREAM-MAP-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n      --> none\n      <-- KSTREAM-PEEK-0000000013\n    Sink: KSTREAM-SINK-0000000031 (topic: external-command)\n      <-- KSTREAM-PEEK-0000000030\n    Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n      <-- KSTREAM-MAPVALUES-0000000035\n    Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n      <-- KSTREAM-MAP-0000000037\n\n", (Object)topology.describe().toString());
    }

    @Test
    public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
        Topology attemptedOptimize = this.getTopologyWithChangingValuesAfterChangingKey("all");
        Topology noOptimization = this.getTopologyWithChangingValuesAfterChangingKey("none");
        Assertions.assertEquals((Object)attemptedOptimize.describe().toString(), (Object)noOptimization.describe().toString());
        Assertions.assertEquals((int)2, (int)this.getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
        Assertions.assertEquals((int)2, (int)this.getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
    }

    @Deprecated
    @Test
    public void shouldNotOptimizeWhenAThroughOperationIsDone() {
        Topology attemptedOptimize = this.getTopologyWithThroughOperation("all");
        Topology noOptimization = this.getTopologyWithThroughOperation("none");
        Assertions.assertEquals((Object)attemptedOptimize.describe().toString(), (Object)noOptimization.describe().toString());
        Assertions.assertEquals((int)0, (int)this.getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
        Assertions.assertEquals((int)0, (int)this.getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
    }

    @Test
    public void shouldOptimizeSeveralMergeNodesWithCommonKeyChangingParent() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream parentStream = streamsBuilder.stream("input_topic", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).selectKey(Integer::sum);
        KStream childStream1 = parentStream.mapValues(v -> v + 1);
        KStream childStream2 = parentStream.mapValues(v -> v + 2);
        KStream childStream3 = parentStream.mapValues(v -> v + 3);
        childStream1.merge(childStream2).merge(childStream3).to("output_topic");
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        Topology topology = streamsBuilder.build(properties);
        Assertions.assertEquals((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])\n      --> KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, KSTREAM-MAPVALUES-0000000004\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n      --> KSTREAM-MERGE-0000000005\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-MERGE-0000000005\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-KEY-SELECT-0000000001\n    Processor: KSTREAM-MERGE-0000000005 (stores: [])\n      --> KSTREAM-MERGE-0000000006\n      <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003\n    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n      --> KSTREAM-SINK-0000000007\n      <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004\n    Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n      <-- KSTREAM-MERGE-0000000006\n\n", (Object)topology.describe().toString());
    }

    @Test
    public void shouldNotOptimizeWhenRepartitionOperationIsDone() {
        Topology attemptedOptimize = this.getTopologyWithRepartitionOperation("all");
        Topology noOptimization = this.getTopologyWithRepartitionOperation("none");
        Assertions.assertEquals((Object)attemptedOptimize.describe().toString(), (Object)noOptimization.describe().toString());
        Assertions.assertEquals((int)2, (int)this.getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
        Assertions.assertEquals((int)2, (int)this.getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
    }

    private Topology getTopologyWithChangingValuesAfterChangingKey(String optimizeConfig) {
        StreamsBuilder builder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", optimizeConfig);
        KStream inputStream = builder.stream("input");
        KStream mappedKeyStream = inputStream.selectKey((k, v) -> k + v);
        mappedKeyStream.mapValues(v -> v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output");
        mappedKeyStream.flatMapValues(v -> Arrays.asList(v.split("\\s"))).groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(5000L))).count().toStream().to("windowed-output");
        return builder.build(properties);
    }

    @Deprecated
    private Topology getTopologyWithThroughOperation(String optimizeConfig) {
        StreamsBuilder builder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", optimizeConfig);
        KStream inputStream = builder.stream("input");
        KStream mappedKeyStream = inputStream.selectKey((k, v) -> k + v).through("through-topic");
        mappedKeyStream.groupByKey().count().toStream().to("output");
        mappedKeyStream.groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(5000L))).count().toStream().to("windowed-output");
        return builder.build(properties);
    }

    private Topology getTopologyWithRepartitionOperation(String optimizeConfig) {
        StreamsBuilder builder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", optimizeConfig);
        KStream inputStream = builder.stream("input").selectKey((k, v) -> k + v);
        inputStream.repartition().groupByKey().count().toStream().to("output");
        inputStream.repartition().groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(5000L))).count().toStream().to("windowed-output");
        return builder.build(properties);
    }

    private int getCountOfRepartitionTopicsFound(String topologyString) {
        Matcher matcher = this.repartitionTopicPattern.matcher(topologyString);
        ArrayList<String> repartitionTopicsFound = new ArrayList<String>();
        while (matcher.find()) {
            repartitionTopicsFound.add(matcher.group());
        }
        return repartitionTopicsFound.size();
    }
}

