/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka010;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class JavaDirectKafkaStreamSuite
implements Serializable {
    private transient JavaStreamingContext ssc = null;
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass().getSimpleName());
        this.ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds((long)200L));
    }

    @After
    public void tearDown() {
        if (this.ssc != null) {
            this.ssc.stop();
            this.ssc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaStream() throws InterruptedException {
        String topic1 = "topic1";
        String topic2 = "topic2";
        final AtomicReference offsetRanges = new AtomicReference();
        String[] topic1data = this.createTopicAndSendData("topic1");
        String[] topic2data = this.createTopicAndSendData("topic2");
        HashSet<String> sent = new HashSet<String>();
        sent.addAll(Arrays.asList(topic1data));
        sent.addAll(Arrays.asList(topic2data));
        Random random = new Random();
        HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", this.kafkaTestUtils.brokerAddress());
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("auto.offset.reset", "earliest");
        kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() + "-" + System.currentTimeMillis());
        JavaInputDStream istream1 = KafkaUtils.createDirectStream(this.ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList("topic1"), kafkaParams));
        JavaDStream stream1 = istream1.transform((Function)new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<ConsumerRecord<String, String>>>(){

            public JavaRDD<ConsumerRecord<String, String>> call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
                offsetRanges.set(offsets);
                Assert.assertEquals((Object)"topic1", (Object)offsets[0].topic());
                return rdd;
            }
        }).map((Function)new Function<ConsumerRecord<String, String>, String>(){

            public String call(ConsumerRecord<String, String> r) {
                return r.value();
            }
        });
        HashMap<String, Object> kafkaParams2 = new HashMap<String, Object>(kafkaParams);
        kafkaParams2.put("group.id", "java-test-consumer-" + random.nextInt() + "-" + System.currentTimeMillis());
        JavaInputDStream istream2 = KafkaUtils.createDirectStream(this.ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList("topic2"), kafkaParams2));
        JavaDStream stream2 = istream2.transform((Function)new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<ConsumerRecord<String, String>>>(){

            public JavaRDD<ConsumerRecord<String, String>> call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
                offsetRanges.set(offsets);
                Assert.assertEquals((Object)"topic2", (Object)offsets[0].topic());
                return rdd;
            }
        }).map((Function)new Function<ConsumerRecord<String, String>, String>(){

            public String call(ConsumerRecord<String, String> r) {
                return r.value();
            }
        });
        JavaDStream unifiedStream = stream1.union(stream2);
        final Set result = Collections.synchronizedSet(new HashSet());
        unifiedStream.foreachRDD((VoidFunction)new VoidFunction<JavaRDD<String>>(){

            public void call(JavaRDD<String> rdd) {
                result.addAll(rdd.collect());
            }
        });
        this.ssc.start();
        long startTime = System.currentTimeMillis();
        boolean matches = false;
        while (!matches && System.currentTimeMillis() - startTime < 20000L) {
            matches = sent.size() == result.size();
            Thread.sleep(50L);
        }
        Assert.assertEquals(sent, result);
        this.ssc.stop();
    }

    private String[] createTopicAndSendData(String topic) {
        String[] data = new String[]{topic + "-1", topic + "-2", topic + "-3"};
        this.kafkaTestUtils.createTopic(topic);
        this.kafkaTestUtils.sendMessages(topic, data);
        return data;
    }
}

