/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RepartitionOptimizingTest {
    private final Logger log = LoggerFactory.getLogger(RepartitionOptimizingTest.class);
    private static final String INPUT_TOPIC = "input";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String AGGREGATION_TOPIC = "outputTopic_1";
    private static final String REDUCE_TOPIC = "outputTopic_2";
    private static final String JOINED_TOPIC = "joinedOutputTopic";
    private static final int ONE_REPARTITION_TOPIC = 1;
    private static final int FOUR_REPARTITION_TOPICS = 4;
    private final Serializer<String> stringSerializer = new StringSerializer();
    private final Deserializer<String> stringDeserializer = new StringDeserializer();
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private Properties streamsConfiguration;
    private TopologyTestDriver topologyTestDriver;
    private final Initializer<Integer> initializer = () -> 0;
    private final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
    private final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2;
    private final List<String> processorValueCollector = new ArrayList<String>();
    private final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)3L), KeyValue.pair((Object)"B", (Object)3L), KeyValue.pair((Object)"C", (Object)3L));
    private final List<KeyValue<String, Integer>> expectedAggKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)9), KeyValue.pair((Object)"B", (Object)9), KeyValue.pair((Object)"C", (Object)9));
    private final List<KeyValue<String, String>> expectedReduceKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)"foo:bar:baz"), KeyValue.pair((Object)"B", (Object)"foo:bar:baz"), KeyValue.pair((Object)"C", (Object)"foo:bar:baz"));
    private final List<KeyValue<String, String>> expectedJoinKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)"foo:3"), KeyValue.pair((Object)"A", (Object)"bar:3"), KeyValue.pair((Object)"A", (Object)"baz:3"));
    private final List<String> expectedCollectedProcessorValues = Arrays.asList("FOO", "BAR", "BAZ");
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: sourceStream (topics: [input])\n      --> source-map\n    Processor: source-map (stores: [])\n      --> process-filter, count-groupByKey-repartition-filter\n      <-- sourceStream\n    Processor: process-filter (stores: [])\n      --> process-mapValues\n      <-- source-map\n    Processor: count-groupByKey-repartition-filter (stores: [])\n      --> count-groupByKey-repartition-sink\n      <-- source-map\n    Processor: process-mapValues (stores: [])\n      --> process\n      <-- process-filter\n    Sink: count-groupByKey-repartition-sink (topic: count-groupByKey-repartition)\n      <-- count-groupByKey-repartition-filter\n    Processor: process (stores: [])\n      --> none\n      <-- process-mapValues\n\n  Sub-topology: 1\n    Source: count-groupByKey-repartition-source (topics: [count-groupByKey-repartition])\n      --> aggregate, count, join-filter, reduce-filter\n    Processor: count (stores: [count-store])\n      --> count-toStream\n      <-- count-groupByKey-repartition-source\n    Processor: count-toStream (stores: [])\n      --> join-other-windowed, count-to\n      <-- count\n    Processor: join-filter (stores: [])\n      --> join-this-windowed\n      <-- count-groupByKey-repartition-source\n    Processor: reduce-filter (stores: [])\n      --> reduce-peek\n      <-- count-groupByKey-repartition-source\n    Processor: join-other-windowed (stores: [other-join-store])\n      --> join-other-join\n      <-- count-toStream\n    Processor: join-this-windowed (stores: [join-store])\n      --> join-this-join\n      <-- join-filter\n    Processor: reduce-peek (stores: [])\n      --> reducer\n      <-- reduce-filter\n    Processor: aggregate (stores: [aggregate-store])\n      --> aggregate-toStream\n      <-- count-groupByKey-repartition-source\n    Processor: join-other-join (stores: [join-store])\n      --> join-merge\n      <-- join-other-windowed\n    Processor: join-this-join (stores: [other-join-store])\n      --> join-merge\n      <-- join-this-windowed\n    Processor: reducer (stores: [reduce-store])\n      --> reduce-toStream\n      <-- reduce-peek\n    Processor: aggregate-toStream (stores: [])\n      --> reduce-to\n      <-- aggregate\n    Processor: join-merge (stores: [])\n      --> join-to\n      <-- join-this-join, join-other-join\n    Processor: reduce-toStream (stores: [])\n      --> KSTREAM-SINK-0000000023\n      <-- reducer\n    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n      <-- reduce-toStream\n    Sink: count-to (topic: outputTopic_0)\n      <-- count-toStream\n    Sink: join-to (topic: joinedOutputTopic)\n      <-- join-merge\n    Sink: reduce-to (topic: outputTopic_1)\n      <-- aggregate-toStream\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: sourceStream (topics: [input])\n      --> source-map\n    Processor: source-map (stores: [])\n      --> reduce-filter, process-filter, aggregate-groupByKey-repartition-filter, count-groupByKey-repartition-filter, join-filter\n      <-- sourceStream\n    Processor: reduce-filter (stores: [])\n      --> reduce-peek\n      <-- source-map\n    Processor: join-filter (stores: [])\n      --> join-left-repartition-filter\n      <-- source-map\n    Processor: process-filter (stores: [])\n      --> process-mapValues\n      <-- source-map\n    Processor: reduce-peek (stores: [])\n      --> reduce-groupByKey-repartition-filter\n      <-- reduce-filter\n    Processor: aggregate-groupByKey-repartition-filter (stores: [])\n      --> aggregate-groupByKey-repartition-sink\n      <-- source-map\n    Processor: count-groupByKey-repartition-filter (stores: [])\n      --> count-groupByKey-repartition-sink\n      <-- source-map\n    Processor: join-left-repartition-filter (stores: [])\n      --> join-left-repartition-sink\n      <-- join-filter\n    Processor: process-mapValues (stores: [])\n      --> process\n      <-- process-filter\n    Processor: reduce-groupByKey-repartition-filter (stores: [])\n      --> reduce-groupByKey-repartition-sink\n      <-- reduce-peek\n    Sink: aggregate-groupByKey-repartition-sink (topic: aggregate-groupByKey-repartition)\n      <-- aggregate-groupByKey-repartition-filter\n    Sink: count-groupByKey-repartition-sink (topic: count-groupByKey-repartition)\n      <-- count-groupByKey-repartition-filter\n    Sink: join-left-repartition-sink (topic: join-left-repartition)\n      <-- join-left-repartition-filter\n    Processor: process (stores: [])\n      --> none\n      <-- process-mapValues\n    Sink: reduce-groupByKey-repartition-sink (topic: reduce-groupByKey-repartition)\n      <-- reduce-groupByKey-repartition-filter\n\n  Sub-topology: 1\n    Source: count-groupByKey-repartition-source (topics: [count-groupByKey-repartition])\n      --> count\n    Processor: count (stores: [count-store])\n      --> count-toStream\n      <-- count-groupByKey-repartition-source\n    Processor: count-toStream (stores: [])\n      --> join-other-windowed, count-to\n      <-- count\n    Source: join-left-repartition-source (topics: [join-left-repartition])\n      --> join-this-windowed\n    Processor: join-other-windowed (stores: [other-join-store])\n      --> join-other-join\n      <-- count-toStream\n    Processor: join-this-windowed (stores: [join-store])\n      --> join-this-join\n      <-- join-left-repartition-source\n    Processor: join-other-join (stores: [join-store])\n      --> join-merge\n      <-- join-other-windowed\n    Processor: join-this-join (stores: [other-join-store])\n      --> join-merge\n      <-- join-this-windowed\n    Processor: join-merge (stores: [])\n      --> join-to\n      <-- join-this-join, join-other-join\n    Sink: count-to (topic: outputTopic_0)\n      <-- count-toStream\n    Sink: join-to (topic: joinedOutputTopic)\n      <-- join-merge\n\n  Sub-topology: 2\n    Source: aggregate-groupByKey-repartition-source (topics: [aggregate-groupByKey-repartition])\n      --> aggregate\n    Processor: aggregate (stores: [aggregate-store])\n      --> aggregate-toStream\n      <-- aggregate-groupByKey-repartition-source\n    Processor: aggregate-toStream (stores: [])\n      --> reduce-to\n      <-- aggregate\n    Sink: reduce-to (topic: outputTopic_1)\n      <-- aggregate-toStream\n\n  Sub-topology: 3\n    Source: reduce-groupByKey-repartition-source (topics: [reduce-groupByKey-repartition])\n      --> reducer\n    Processor: reducer (stores: [reduce-store])\n      --> reduce-toStream\n      <-- reduce-groupByKey-repartition-source\n    Processor: reduce-toStream (stores: [])\n      --> KSTREAM-SINK-0000000023\n      <-- reducer\n    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n      <-- reduce-toStream\n\n";

    @BeforeEach
    public void setUp() {
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
        this.streamsConfiguration.setProperty("statestore.cache.max.bytes", Integer.toString(10240));
        this.streamsConfiguration.setProperty("commit.interval.ms", Long.toString(5000L));
        this.processorValueCollector.clear();
    }

    @AfterEach
    public void tearDown() {
        this.topologyTestDriver.close();
    }

    @Test
    public void shouldSendCorrectRecords_OPTIMIZED() {
        this.runTest("all", 1);
    }

    @Test
    public void shouldSendCorrectResults_NO_OPTIMIZATION() {
        this.runTest("none", 4);
    }

    private void runTest(String optimizationConfig, int expectedNumberRepartitionTopics) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream sourceStream = builder.stream(INPUT_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()).withName("sourceStream"));
        KStream mappedStream = sourceStream.map((k, v) -> KeyValue.pair((Object)k.toUpperCase(Locale.getDefault()), (Object)v), Named.as((String)"source-map"));
        mappedStream.filter((k, v) -> k.equals("B"), Named.as((String)"process-filter")).mapValues(v -> v.toUpperCase(Locale.getDefault()), Named.as((String)"process-mapValues")).process(() -> new SimpleProcessor(this.processorValueCollector), Named.as((String)"process"), new String[0]);
        KStream countStream = mappedStream.groupByKey(Grouped.as((String)"count-groupByKey")).count(Named.as((String)"count"), Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"count-store")).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())).toStream(Named.as((String)"count-toStream"));
        countStream.to(COUNT_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()).withName("count-to"));
        mappedStream.groupByKey(Grouped.as((String)"aggregate-groupByKey")).aggregate(this.initializer, this.aggregator, Named.as((String)"aggregate"), Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"aggregate-store")).withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())).toStream(Named.as((String)"aggregate-toStream")).to(AGGREGATION_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Integer()).withName("reduce-to"));
        mappedStream.filter((k, v) -> true, Named.as((String)"reduce-filter")).peek((k, v) -> System.out.println(k + ":" + v), Named.as((String)"reduce-peek")).groupByKey(Grouped.as((String)"reduce-groupByKey")).reduce(this.reducer, Named.as((String)"reducer"), Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"reduce-store"))).toStream(Named.as((String)"reduce-toStream")).to(REDUCE_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        mappedStream.filter((k, v) -> k.equals("A"), Named.as((String)"join-filter")).join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), JoinWindows.of((Duration)Duration.ofMillis(5000L)), StreamJoined.with((WindowBytesStoreSupplier)Stores.inMemoryWindowStore((String)"join-store", (Duration)Duration.ofDays(1L), (Duration)Duration.ofMillis(10000L), (boolean)true), (WindowBytesStoreSupplier)Stores.inMemoryWindowStore((String)"other-join-store", (Duration)Duration.ofDays(1L), (Duration)Duration.ofMillis(10000L), (boolean)true)).withName("join").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withOtherValueSerde(Serdes.Long())).to(JOINED_TOPIC, Produced.as((String)"join-to"));
        this.streamsConfiguration.setProperty("topology.optimization", optimizationConfig);
        Topology topology = builder.build(this.streamsConfiguration);
        this.topologyTestDriver = new TopologyTestDriver(topology, this.streamsConfiguration);
        TestInputTopic inputTopicA = this.topologyTestDriver.createInputTopic(INPUT_TOPIC, this.stringSerializer, this.stringSerializer);
        TestOutputTopic countOutputTopic = this.topologyTestDriver.createOutputTopic(COUNT_TOPIC, this.stringDeserializer, (Deserializer)new LongDeserializer());
        TestOutputTopic aggregationOutputTopic = this.topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, this.stringDeserializer, (Deserializer)new IntegerDeserializer());
        TestOutputTopic reduceOutputTopic = this.topologyTestDriver.createOutputTopic(REDUCE_TOPIC, this.stringDeserializer, this.stringDeserializer);
        TestOutputTopic joinedOutputTopic = this.topologyTestDriver.createOutputTopic(JOINED_TOPIC, this.stringDeserializer, this.stringDeserializer);
        inputTopicA.pipeKeyValueList(this.getKeyValues());
        String topologyString = topology.describe().toString();
        if (optimizationConfig.equals("all")) {
            Assertions.assertEquals((Object)EXPECTED_OPTIMIZED_TOPOLOGY, (Object)topologyString);
        } else {
            Assertions.assertEquals((Object)EXPECTED_UNOPTIMIZED_TOPOLOGY, (Object)topologyString);
        }
        Assertions.assertEquals((int)expectedNumberRepartitionTopics, (int)this.getCountOfRepartitionTopicsFound(topologyString));
        MatcherAssert.assertThat((Object)3, (org.hamcrest.Matcher)CoreMatchers.equalTo((Object)this.processorValueCollector.size()));
        MatcherAssert.assertThat(this.processorValueCollector, (org.hamcrest.Matcher)CoreMatchers.equalTo(this.expectedCollectedProcessorValues));
        MatcherAssert.assertThat((Object)countOutputTopic.readKeyValuesToMap(), (org.hamcrest.Matcher)CoreMatchers.equalTo(this.keyValueListToMap(this.expectedCountKeyValues)));
        MatcherAssert.assertThat((Object)aggregationOutputTopic.readKeyValuesToMap(), (org.hamcrest.Matcher)CoreMatchers.equalTo(this.keyValueListToMap(this.expectedAggKeyValues)));
        MatcherAssert.assertThat((Object)reduceOutputTopic.readKeyValuesToMap(), (org.hamcrest.Matcher)CoreMatchers.equalTo(this.keyValueListToMap(this.expectedReduceKeyValues)));
        MatcherAssert.assertThat((Object)joinedOutputTopic.readKeyValuesToMap(), (org.hamcrest.Matcher)CoreMatchers.equalTo(this.keyValueListToMap(this.expectedJoinKeyValues)));
    }

    private <K, V> Map<K, V> keyValueListToMap(List<KeyValue<K, V>> keyValuePairs) {
        HashMap<Object, Object> map = new HashMap<Object, Object>();
        for (KeyValue<K, V> pair : keyValuePairs) {
            map.put(pair.key, pair.value);
        }
        return map;
    }

    private int getCountOfRepartitionTopicsFound(String topologyString) {
        Matcher matcher = this.repartitionTopicPattern.matcher(topologyString);
        ArrayList<String> repartitionTopicsFound = new ArrayList<String>();
        while (matcher.find()) {
            repartitionTopicsFound.add(matcher.group());
        }
        return repartitionTopicsFound.size();
    }

    private List<KeyValue<String, String>> getKeyValues() {
        ArrayList<KeyValue<String, String>> keyValueList = new ArrayList<KeyValue<String, String>>();
        String[] keys = new String[]{"a", "b", "c"};
        String[] values = new String[]{"foo", "bar", "baz"};
        for (String key : keys) {
            for (String value : values) {
                keyValueList.add((KeyValue<String, String>)KeyValue.pair((Object)key, (Object)value));
            }
        }
        return keyValueList;
    }

    private static class SimpleProcessor
    implements Processor<String, String, Void, Void> {
        final List<String> valueList;

        SimpleProcessor(List<String> valueList) {
            this.valueList = valueList;
        }

        public void process(Record<String, String> record) {
            this.valueList.add((String)record.value());
        }
    }
}

