/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class RegexSourceIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String TOPIC_1 = "topic-1";
    private static final String TOPIC_2 = "topic-2";
    private static final String TOPIC_A = "topic-A";
    private static final String TOPIC_C = "topic-C";
    private static final String TOPIC_Y = "topic-Y";
    private static final String TOPIC_Z = "topic-Z";
    private static final String FA_TOPIC = "fa";
    private static final String FOO_TOPIC = "foo";
    private static final String PARTITIONED_TOPIC_1 = "partitioned-1";
    private static final String PARTITIONED_TOPIC_2 = "partitioned-2";
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
    private Properties streamsConfiguration;
    private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
    private KafkaStreams streams;

    public RegexSourceIntegrationTest() {
        this.mockTime = RegexSourceIntegrationTest.CLUSTER.time;
    }

    @BeforeClass
    public static void startKafkaCluster() throws InterruptedException {
        CLUSTER.createTopics(TOPIC_1, TOPIC_2, TOPIC_A, TOPIC_C, TOPIC_Y, TOPIC_Z, FA_TOPIC, FOO_TOPIC, DEFAULT_OUTPUT_TOPIC);
        CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
        CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
    }

    @Before
    public void setUp() {
        Properties properties = new Properties();
        properties.put("internal.leave.group.on.close", (Object)true);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, properties);
    }

    @After
    public void tearDown() throws IOException {
        if (this.streams != null) {
            this.streams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void testRegexMatchesTopicsAWhenCreated() throws Exception {
        Serde<String> stringSerde = Serdes.String();
        final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
        final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
        StreamsConfig streamsConfig = new StreamsConfig(this.streamsConfiguration);
        CLUSTER.createTopic("TEST-TOPIC-1");
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        final ArrayList assignedTopics = new ArrayList();
        this.streams = new KafkaStreams(builder.build(), streamsConfig, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

            @Override
            public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                    @Override
                    public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                        super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(assignedTopics, listener));
                    }
                };
            }
        });
        this.streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return assignedTopics.equals(expectedFirstAssignment);
            }
        }, (String)STREAM_TASKS_NOT_UPDATED);
        CLUSTER.createTopic("TEST-TOPIC-2");
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return assignedTopics.equals(expectedSecondAssignment);
            }
        }, (String)STREAM_TASKS_NOT_UPDATED);
    }

    @Test
    public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
        Serde<String> stringSerde = Serdes.String();
        final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
        final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
        StreamsConfig streamsConfig = new StreamsConfig(this.streamsConfiguration);
        CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        final ArrayList assignedTopics = new ArrayList();
        this.streams = new KafkaStreams(builder.build(), streamsConfig, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

            @Override
            public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                    @Override
                    public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                        super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(assignedTopics, listener));
                    }
                };
            }
        });
        this.streams.start();
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return assignedTopics.equals(expectedFirstAssignment);
            }
        }, (String)STREAM_TASKS_NOT_UPDATED);
        CLUSTER.deleteTopic("TEST-TOPIC-A");
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return assignedTopics.equals(expectedSecondAssignment);
            }
        }, (String)STREAM_TASKS_NOT_UPDATED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false);
        long thirtySecondTimeout = 30000L;
        final TopologyBuilder builder = new TopologyBuilder().addSource("ingest", Pattern.compile("topic-\\d+")).addProcessor("my-processor", processorSupplier, "ingest").addStateStore(stateStoreSupplier, "my-processor");
        this.streams = new KafkaStreams(builder, this.streamsConfiguration);
        try {
            this.streams.start();
            TestCondition stateStoreNameBoundToSourceTopic = new TestCondition(){

                public boolean conditionMet() {
                    Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics();
                    List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
                    return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals(RegexSourceIntegrationTest.TOPIC_1);
                }
            };
            TestUtils.waitForCondition((TestCondition)stateStoreNameBoundToSourceTopic, (long)30000L, (String)"Did not find topic: [topic-1] connected to state store: [testStateStore]");
        }
        finally {
            this.streams.close();
        }
    }

    @Test
    public void testShouldReadFromRegexAndNamedTopics() throws Exception {
        String topic1TestMessage = "topic-1 test";
        String topic2TestMessage = "topic-2 test";
        String topicATestMessage = "topic-A test";
        String topicCTestMessage = "topic-C test";
        String topicYTestMessage = "topic-Y test";
        String topicZTestMessage = "topic-Z test";
        Serde<String> stringSerde = Serdes.String();
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
        KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
        KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(TOPIC_Y, TOPIC_Z));
        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        this.streams.start();
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("topic-1 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList("topic-2 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList("topic-A test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList("topic-C test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList("topic-Y test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList("topic-Z test"), producerConfig, (Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        List<String> expectedReceivedValues = Arrays.asList("topic-A test", "topic-1 test", "topic-2 test", "topic-C test", "topic-Y test", "topic-Z test");
        List receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
        ArrayList actualValues = new ArrayList(6);
        for (KeyValue receivedKeyValue : receivedKeyValues) {
            actualValues.add(receivedKeyValue.value);
        }
        Collections.sort(actualValues);
        Collections.sort(expectedReceivedValues);
        Assert.assertThat(actualValues, (Matcher)CoreMatchers.equalTo(expectedReceivedValues));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
        KafkaStreams partitionedStreamsLeader = null;
        KafkaStreams partitionedStreamsFollower = null;
        try {
            Serde<String> stringSerde = Serdes.String();
            StreamsBuilder builderLeader = new StreamsBuilder();
            StreamsBuilder builderFollower = new StreamsBuilder();
            final List<String> expectedAssignment = Arrays.asList(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2);
            KStream<String, String> partitionedStreamLeader = builderLeader.stream(Pattern.compile("partitioned-\\d"));
            KStream<String, String> partitionedStreamFollower = builderFollower.stream(Pattern.compile("partitioned-\\d"));
            partitionedStreamLeader.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
            partitionedStreamFollower.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
            final ArrayList leaderAssignment = new ArrayList();
            final ArrayList followerAssignment = new ArrayList();
            StreamsConfig config = new StreamsConfig(this.streamsConfiguration);
            partitionedStreamsLeader = new KafkaStreams(builderLeader.build(), config, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

                @Override
                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                    return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                        @Override
                        public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                            super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(leaderAssignment, listener));
                        }
                    };
                }
            });
            partitionedStreamsFollower = new KafkaStreams(builderFollower.build(), config, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

                @Override
                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                    return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                        @Override
                        public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                            super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(followerAssignment, listener));
                        }
                    };
                }
            });
            partitionedStreamsLeader.start();
            partitionedStreamsFollower.start();
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return followerAssignment.equals(expectedAssignment) && leaderAssignment.equals(expectedAssignment);
                }
            }, (String)"topic assignment not completed");
        }
        finally {
            if (partitionedStreamsLeader != null) {
                partitionedStreamsLeader.close();
            }
            if (partitionedStreamsFollower != null) {
                partitionedStreamsFollower.close();
            }
        }
    }

    @Test(expected=AssertionError.class)
    public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
        String fooMessage = "fooMessage";
        String fMessage = "fMessage";
        Serde<String> stringSerde = Serdes.String();
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
        KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        this.streams.start();
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList("fMessage"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList("fooMessage"), producerConfig, (Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000L);
        Assert.fail((String)"Should not get here");
    }

    private static class TheConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private final List<String> assignedTopics;
        private final ConsumerRebalanceListener listener;

        TheConsumerRebalanceListener(List<String> assignedTopics, ConsumerRebalanceListener listener) {
            this.assignedTopics = assignedTopics;
            this.listener = listener;
        }

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            this.assignedTopics.clear();
            this.listener.onPartitionsRevoked(partitions);
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            for (TopicPartition partition : partitions) {
                this.assignedTopics.add(partition.topic());
            }
            Collections.sort(this.assignedTopics);
            this.listener.onPartitionsAssigned(partitions);
        }
    }
}

