/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={IntegrationTest.class})
public class BranchedMultiLevelRepartitionConnectedTopologyTest {
    private static final Logger log = LoggerFactory.getLogger(BranchedMultiLevelRepartitionConnectedTopologyTest.class);
    private static String inputStream;
    private KafkaStreams kafkaStreams;
    private Properties streamsConfiguration;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    private final MockTime mockTime;

    public BranchedMultiLevelRepartitionConnectedTopologyTest() {
        this.mockTime = BranchedMultiLevelRepartitionConnectedTopologyTest.CLUSTER.time;
    }

    @Before
    public void setUp() throws Exception {
        Properties props = new Properties();
        props.put("auto.offset.reset", "earliest");
        props.put("topology.optimization", "none");
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("branched-repartition-topic-test", CLUSTER.bootstrapServers(), Serdes.ByteArray().getClass().getName(), Serdes.ByteArray().getClass().getName(), props);
        inputStream = "input-stream";
        CLUSTER.createTopic(inputStream, 3, 1);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void testTopologyBuild() throws InterruptedException, ExecutionException {
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(inputStream);
        KStream[] branches = input.flatMapValues(value -> Collections.singletonList(new byte[0])).branch(new Predicate[]{(k, v) -> true, (k, v) -> false});
        KTable b1 = branches[0].map(KeyValue::new).groupByKey().reduce((k, v) -> v, Materialized.as((String)"odd_store")).toStream().peek((k, v) -> {}).map(KeyValue::new).groupByKey().reduce((k, v) -> v, Materialized.as((String)"odd_store_2"));
        KTable b2 = branches[1].map(KeyValue::new).groupByKey().reduce((k, v) -> v, Materialized.as((String)"even_store")).toStream().peek((k, v) -> {}).map(KeyValue::new).groupByKey().reduce((k, v) -> v, Materialized.as((String)"even_store_2"));
        b1.join(b2, (v1, v2) -> v1, Materialized.as((String)"joined_store")).toStream();
        Topology topology = builder.build(this.streamsConfiguration);
        log.info("Built topology: {}", (Object)topology.describe());
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), ByteArraySerializer.class, ByteArraySerializer.class);
        List initialKeyValues = Collections.singletonList(KeyValue.pair((Object)new byte[1], (Object)new byte[1]));
        IntegrationTestUtils.produceKeyValuesSynchronously(inputStream, initialKeyValues, producerConfig, (Time)this.mockTime);
        this.kafkaStreams = new KafkaStreams(topology, this.streamsConfiguration);
        this.kafkaStreams.cleanUp();
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> this.kafkaStreams.state() == KafkaStreams.State.RUNNING, (String)"Failed to observe stream transits to RUNNING");
        this.kafkaStreams.close();
    }

    static {
        CLUSTER = new EmbeddedKafkaCluster(1);
    }
}

