/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class KStreamSplitTest {
    private final String topicName = "topic";
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Predicate<Integer, String> isEven = (key, value) -> key % 2 == 0;
    private final Predicate<Integer, String> isMultipleOfThree = (key, value) -> key % 3 == 0;
    private final Predicate<Integer, String> isMultipleOfFive = (key, value) -> key % 5 == 0;
    private final Predicate<Integer, String> isMultipleOfSeven = (key, value) -> key % 7 == 0;
    private final Predicate<Integer, String> isNegative = (key, value) -> key < 0;
    private final KStream<Integer, String> source = this.builder.stream("topic", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));

    @Test
    public void testKStreamSplit() {
        Map branches = this.source.split().branch(this.isEven, Branched.withConsumer(ks -> ks.to("x2"))).branch(this.isMultipleOfThree, Branched.withConsumer(ks -> ks.to("x3"))).branch(this.isMultipleOfFive, Branched.withConsumer(ks -> ks.to("x5"))).noDefaultBranch();
        Assertions.assertEquals((int)0, (int)branches.size());
        this.builder.build();
        this.withDriver(driver -> {
            TestOutputTopic x2 = driver.createOutputTopic("x2", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            TestOutputTopic x3 = driver.createOutputTopic("x3", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            TestOutputTopic x5 = driver.createOutputTopic("x5", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            Assertions.assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), (Object)x2.readValuesToList());
            Assertions.assertEquals(Arrays.asList("V3"), (Object)x3.readValuesToList());
            Assertions.assertEquals(Arrays.asList("V5"), (Object)x5.readValuesToList());
        });
    }

    private void withDriver(Consumer<TopologyTestDriver> test) {
        int[] expectedKeys = new int[]{-1, 0, 1, 2, 3, 4, 5, 6, 7};
        Topology topology = this.builder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
            for (int expectedKey : expectedKeys) {
                inputTopic.pipeInput((Object)expectedKey, (Object)("V" + expectedKey));
            }
            test.accept(driver);
        }
    }

    @Test
    public void testTypeVariance() {
        Predicate positive = (key, value) -> key.doubleValue() > 0.0;
        Predicate negative = (key, value) -> key.doubleValue() < 0.0;
        new StreamsBuilder().stream("empty").split().branch(positive).branch(negative);
    }

    @Test
    public void testResultingMap() {
        Map branches = this.source.split(Named.as((String)"foo-")).branch(this.isEven, Branched.as((String)"bar")).branch(this.isMultipleOfThree, Branched.withConsumer(ks -> {})).branch(this.isMultipleOfFive, Branched.withFunction(ks -> null)).branch(this.isNegative, Branched.withFunction(ks -> ks)).branch(this.isMultipleOfSeven).defaultBranch();
        Assertions.assertEquals((int)4, (int)branches.size());
        for (Map.Entry branch : branches.entrySet()) {
            ((KStream)branch.getValue()).to((String)branch.getKey());
        }
        this.builder.build();
        this.withDriver(driver -> {
            TestOutputTopic even = driver.createOutputTopic("foo-bar", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            TestOutputTopic negative = driver.createOutputTopic("foo-4", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            TestOutputTopic x7 = driver.createOutputTopic("foo-5", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            TestOutputTopic defaultBranch = driver.createOutputTopic("foo-0", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            Assertions.assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), (Object)even.readValuesToList());
            Assertions.assertEquals(Arrays.asList("V-1"), (Object)negative.readValuesToList());
            Assertions.assertEquals(Arrays.asList("V7"), (Object)x7.readValuesToList());
            Assertions.assertEquals(Arrays.asList("V1"), (Object)defaultBranch.readValuesToList());
        });
    }

    @Test
    public void testBranchingWithNoTerminalOperation() {
        String outputTopicName = "output";
        this.source.split().branch(this.isEven, Branched.withConsumer(ks -> ks.to("output"))).branch(this.isMultipleOfFive, Branched.withConsumer(ks -> ks.to("output")));
        this.builder.build();
        this.withDriver(driver -> {
            TestOutputTopic outputTopic = driver.createOutputTopic("output", (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            Assertions.assertEquals(Arrays.asList("V0", "V2", "V4", "V5", "V6"), (Object)outputTopic.readValuesToList());
        });
    }
}

