/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka010;

import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContextImpl;
import org.apache.spark.TaskContextImpl$;
import org.apache.spark.streaming.kafka010.CacheKey;
import org.apache.spark.streaming.kafka010.InternalKafkaConsumer;
import org.apache.spark.streaming.kafka010.KafkaDataConsumer;
import org.apache.spark.streaming.kafka010.KafkaDataConsumer$;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.compatible.Assertion;
import org.scalatestplus.mockito.MockitoSugar;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.VolatileObjectRef;
import scala.util.Random$;

@ScalaSignature(bytes="\u0006\u0005-4AAD\b\u00015!)q\u0005\u0001C\u0001Q!I1\u0006\u0001a\u0001\u0002\u0004%I\u0001\f\u0005\na\u0001\u0001\r\u00111A\u0005\nEB\u0011B\u000f\u0001A\u0002\u0003\u0005\u000b\u0015B\u0017\t\u000fm\u0002!\u0019!C\u0005y!1Q\t\u0001Q\u0001\nuBqA\u0012\u0001C\u0002\u0013%q\t\u0003\u0004Q\u0001\u0001\u0006I\u0001\u0013\u0005\b#\u0002\u0011\r\u0011\"\u0003=\u0011\u0019\u0011\u0006\u0001)A\u0005{!)1\u000b\u0001C!)\")Q\u000b\u0001C!)\")a\u000b\u0001C\u0005/\n12*\u00194lC\u0012\u000bG/Y\"p]N,X.\u001a:Tk&$XM\u0003\u0002\u0011#\u0005A1.\u00194lCB\n\u0004G\u0003\u0002\u0013'\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003)U\tQa\u001d9be.T!AF\f\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005A\u0012aA8sO\u000e\u00011c\u0001\u0001\u001c?A\u0011A$H\u0007\u0002'%\u0011ad\u0005\u0002\u000e'B\f'o\u001b$v]N+\u0018\u000e^3\u0011\u0005\u0001*S\"A\u0011\u000b\u0005\t\u001a\u0013aB7pG.LGo\u001c\u0006\u0003I]\tQb]2bY\u0006$Xm\u001d;qYV\u001c\u0018B\u0001\u0014\"\u00051iunY6ji>\u001cVoZ1s\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0006\u0005\u0002+\u00015\tq\"A\u0005uKN$X\u000b^5mgV\tQ\u0006\u0005\u0002+]%\u0011qf\u0004\u0002\u000f\u0017\u000647.\u0019+fgR,F/\u001b7t\u00035!Xm\u001d;Vi&d7o\u0018\u0013fcR\u0011!\u0007\u000f\t\u0003gYj\u0011\u0001\u000e\u0006\u0002k\u0005)1oY1mC&\u0011q\u0007\u000e\u0002\u0005+:LG\u000fC\u0004:\u0007\u0005\u0005\t\u0019A\u0017\u0002\u0007a$\u0013'\u0001\u0006uKN$X\u000b^5mg\u0002\nQ\u0001^8qS\u000e,\u0012!\u0010\t\u0003}\rk\u0011a\u0010\u0006\u0003\u0001\u0006\u000bA\u0001\\1oO*\t!)\u0001\u0003kCZ\f\u0017B\u0001#@\u0005\u0019\u0019FO]5oO\u00061Ao\u001c9jG\u0002\na\u0002^8qS\u000e\u0004\u0016M\u001d;ji&|g.F\u0001I!\tIe*D\u0001K\u0015\tYE*\u0001\u0004d_6lwN\u001c\u0006\u0003\u001bV\tQa[1gW\u0006L!a\u0014&\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u0006yAo\u001c9jGB\u000b'\u000f^5uS>t\u0007%A\u0004he>,\b/\u00133\u0002\u0011\u001d\u0014x.\u001e9JI\u0002\n\u0011BY3g_J,\u0017\t\u001c7\u0015\u0003I\n\u0001\"\u00194uKJ\fE\u000e\\\u0001\u000fO\u0016$8*\u00194lCB\u000b'/Y7t)\u0005A\u0006\u0003B-]=\"l\u0011A\u0017\u0006\u00037\u0006\u000bA!\u001e;jY&\u0011QL\u0017\u0002\u0004\u001b\u0006\u0004\bCA0g\u001d\t\u0001G\r\u0005\u0002bi5\t!M\u0003\u0002d3\u00051AH]8pizJ!!\u001a\u001b\u0002\rA\u0013X\rZ3g\u0013\t!uM\u0003\u0002fiA\u0011a([\u0005\u0003U~\u0012aa\u00142kK\u000e$\b")
public class KafkaDataConsumerSuite
extends SparkFunSuite
implements MockitoSugar {
    private KafkaTestUtils testUtils;
    private final String topic;
    private final TopicPartition topicPartition;
    private final String groupId;

    public <T> T mock(ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, classTag);
    }

    public <T> T mock(Answer<?> defaultAnswer, ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, defaultAnswer, classTag);
    }

    public <T> T mock(MockSettings mockSettings, ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, (MockSettings)mockSettings, classTag);
    }

    public <T> T mock(String name, ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, (String)name, classTag);
    }

    private KafkaTestUtils testUtils() {
        return this.testUtils;
    }

    private void testUtils_$eq(KafkaTestUtils x$1) {
        this.testUtils = x$1;
    }

    private String topic() {
        return this.topic;
    }

    private TopicPartition topicPartition() {
        return this.topicPartition;
    }

    private String groupId() {
        return this.groupId;
    }

    public void beforeAll() {
        super.beforeAll();
        SparkConf conf = new SparkConf();
        SparkEnv env = (SparkEnv)this.mock(ClassTag$.MODULE$.apply(SparkEnv.class));
        SparkEnv$.MODULE$.set(env);
        Mockito.when((Object)env.conf()).thenReturn((Object)conf);
        this.testUtils_$eq(new KafkaTestUtils());
        this.testUtils().setup();
        KafkaDataConsumer$.MODULE$.init(16, 64, 0.75f);
    }

    public void afterAll() {
        if (this.testUtils() != null) {
            this.testUtils().teardown();
            this.testUtils_$eq(null);
        }
        SparkEnv$.MODULE$.set(null);
        super.afterAll();
    }

    private java.util.Map<String, Object> getKafkaParams() {
        return (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"group.id"), (Object)this.groupId()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)this.testUtils().brokerAddress()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"key.deserializer"), (Object)ByteArrayDeserializer.class.getName()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"value.deserializer"), (Object)ByteArrayDeserializer.class.getName()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"enable.auto.commit"), (Object)"false")}))).asJava();
    }

    public static final /* synthetic */ String $anonfun$new$4(int x$2) {
        return Integer.toString(x$2);
    }

    public static final /* synthetic */ String $anonfun$new$5(KafkaDataConsumer consumer$1, int offset) {
        byte[] bytes = (byte[])consumer$1.get((long)offset, 10000L).value();
        return new String(bytes);
    }

    public final void org$apache$spark$streaming$kafka010$KafkaDataConsumerSuite$$consume$1(int i, java.util.Map kafkaParams$1, IndexedSeq data$1, VolatileObjectRef error$1) {
        boolean useCache = Random$.MODULE$.nextBoolean();
        TaskContextImpl taskContext = Random$.MODULE$.nextBoolean() ? new TaskContextImpl(0, 0, 0, 0L, Random$.MODULE$.nextInt(2), 1, null, null, null, TaskContextImpl$.MODULE$.$lessinit$greater$default$10(), TaskContextImpl$.MODULE$.$lessinit$greater$default$11(), TaskContextImpl$.MODULE$.$lessinit$greater$default$12()) : null;
        KafkaDataConsumer consumer = KafkaDataConsumer$.MODULE$.acquire(this.topicPartition(), kafkaParams$1, (TaskContext)taskContext, useCache);
        try {
            try {
                IndexedSeq rcvd;
                IndexedSeq $org_scalatest_assert_macro_left = rcvd = data$1.indices().map((Function1 & Serializable)offset -> KafkaDataConsumerSuite.$anonfun$new$5(consumer, BoxesRunTime.unboxToInt((Object)offset)));
                IndexedSeq $org_scalatest_assert_macro_right = data$1;
                IndexedSeq indexedSeq = $org_scalatest_assert_macro_left;
                IndexedSeq indexedSeq2 = $org_scalatest_assert_macro_right;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(indexedSeq != null ? !indexedSeq.equals(indexedSeq2) : indexedSeq2 != null), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 139));
            }
            catch (Throwable e) {
                error$1.elem = e;
                throw e;
            }
        }
        finally {
            consumer.release();
        }
    }

    public KafkaDataConsumerSuite() {
        MockitoSugar.$init$((MockitoSugar)this);
        this.topic = new StringBuilder(5).append("topic").append(Random$.MODULE$.nextInt()).toString();
        this.topicPartition = new TopicPartition(this.topic(), 0);
        this.groupId = "groupId";
        this.test("KafkaDataConsumer reuse in case of same groupId and TopicPartition", (Seq)Nil$.MODULE$, (Function0 & Serializable)() -> {
            InternalKafkaConsumer existingInternalConsumer;
            KafkaDataConsumer$.MODULE$.cache().clear();
            java.util.Map<String, Object> kafkaParams = this.getKafkaParams();
            KafkaDataConsumer consumer1 = KafkaDataConsumer$.MODULE$.acquire(this.topicPartition(), kafkaParams, null, true);
            consumer1.release();
            KafkaDataConsumer consumer2 = KafkaDataConsumer$.MODULE$.acquire(this.topicPartition(), kafkaParams, null, true);
            consumer2.release();
            java.util.Map $org_scalatest_assert_macro_left = KafkaDataConsumer$.MODULE$.cache();
            int $org_scalatest_assert_macro_right = 1;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.lengthSizeMacroBool((Object)$org_scalatest_assert_macro_left, "size", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left.size()), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 82));
            CacheKey key = new CacheKey(this.groupId(), this.topicPartition());
            InternalKafkaConsumer $org_scalatest_assert_macro_left2 = existingInternalConsumer = (InternalKafkaConsumer)KafkaDataConsumer$.MODULE$.cache().get(key);
            InternalKafkaConsumer $org_scalatest_assert_macro_right2 = consumer1.internalConsumer();
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "eq", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2 == $org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 85));
            InternalKafkaConsumer $org_scalatest_assert_macro_left3 = existingInternalConsumer;
            InternalKafkaConsumer $org_scalatest_assert_macro_right3 = consumer2.internalConsumer();
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left3, "eq", (Object)$org_scalatest_assert_macro_right3, $org_scalatest_assert_macro_left3 == $org_scalatest_assert_macro_right3, Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 86));
        }, new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 69));
        this.test("new KafkaDataConsumer instance in case of Task retry", (Seq)Nil$.MODULE$, (Function0 & Serializable)() -> {
            KafkaDataConsumer$.MODULE$.cache().clear();
            java.util.Map<String, Object> kafkaParams = this.getKafkaParams();
            CacheKey key = new CacheKey(this.groupId(), this.topicPartition());
            TaskContextImpl context1 = new TaskContextImpl(0, 0, 0, 0L, 0, 1, null, null, null, TaskContextImpl$.MODULE$.$lessinit$greater$default$10(), TaskContextImpl$.MODULE$.$lessinit$greater$default$11(), TaskContextImpl$.MODULE$.$lessinit$greater$default$12());
            KafkaDataConsumer consumer1 = KafkaDataConsumer$.MODULE$.acquire(this.topicPartition(), kafkaParams, (TaskContext)context1, true);
            consumer1.release();
            java.util.Map $org_scalatest_assert_macro_left = KafkaDataConsumer$.MODULE$.cache();
            int $org_scalatest_assert_macro_right = 1;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.lengthSizeMacroBool((Object)$org_scalatest_assert_macro_left, "size", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left.size()), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 100));
            InternalKafkaConsumer $org_scalatest_assert_macro_left2 = (InternalKafkaConsumer)KafkaDataConsumer$.MODULE$.cache().get(key);
            InternalKafkaConsumer $org_scalatest_assert_macro_right2 = consumer1.internalConsumer();
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "eq", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2 == $org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 101));
            TaskContextImpl context2 = new TaskContextImpl(0, 0, 0, 0L, 1, 1, null, null, null, TaskContextImpl$.MODULE$.$lessinit$greater$default$10(), TaskContextImpl$.MODULE$.$lessinit$greater$default$11(), TaskContextImpl$.MODULE$.$lessinit$greater$default$12());
            KafkaDataConsumer consumer2 = KafkaDataConsumer$.MODULE$.acquire(this.topicPartition(), kafkaParams, (TaskContext)context2, true);
            consumer2.release();
            java.util.Map $org_scalatest_assert_macro_left3 = KafkaDataConsumer$.MODULE$.cache();
            int $org_scalatest_assert_macro_right3 = 0;
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.lengthSizeMacroBool((Object)$org_scalatest_assert_macro_left3, "size", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left3.size()), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 109));
            InternalKafkaConsumer $org_scalatest_assert_macro_left4 = consumer1.internalConsumer();
            InternalKafkaConsumer $org_scalatest_assert_macro_right4 = consumer2.internalConsumer();
            Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left4, "ne", (Object)$org_scalatest_assert_macro_right4, $org_scalatest_assert_macro_left4 != $org_scalatest_assert_macro_right4, Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 110));
        }, new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 89));
        this.test("concurrent use of KafkaDataConsumer", (Seq)Nil$.MODULE$, (Function0 & Serializable)() -> {
            Assertion assertion;
            IndexedSeq data = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 1000).map((Function1 & Serializable)x$2 -> KafkaDataConsumerSuite.$anonfun$new$4(BoxesRunTime.unboxToInt((Object)x$2)));
            this.testUtils().createTopic(this.topic());
            this.testUtils().sendMessages(this.topic(), (String[])data.toArray(ClassTag$.MODULE$.apply(String.class)));
            java.util.Map<String, Object> kafkaParams = this.getKafkaParams();
            int numThreads = 100;
            int numConsumerUsages = 500;
            VolatileObjectRef error = VolatileObjectRef.create(null);
            ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
            try {
                IndexedSeq futures = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), numConsumerUsages).map((Function1 & Serializable)i -> threadPool.submit(new Runnable(this, BoxesRunTime.unboxToInt((Object)i), kafkaParams, data, error){
                    private final /* synthetic */ KafkaDataConsumerSuite $outer;
                    private final int i$1;
                    private final java.util.Map kafkaParams$1;
                    private final IndexedSeq data$1;
                    private final VolatileObjectRef error$1;

                    public void run() {
                        this.$outer.org$apache$spark$streaming$kafka010$KafkaDataConsumerSuite$$consume$1(this.i$1, this.kafkaParams$1, this.data$1, this.error$1);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.i$1 = i$1;
                        this.kafkaParams$1 = kafkaParams$1;
                        this.data$1 = data$1;
                        this.error$1 = error$1;
                    }
                }));
                futures.foreach((Function1 & Serializable)x$3 -> x$3.get(1L, TimeUnit.MINUTES));
                Throwable $org_scalatest_assert_macro_left = (Throwable)error.elem;
                Object $org_scalatest_assert_macro_right = null;
                Throwable throwable = $org_scalatest_assert_macro_left;
                Object var11_10 = null;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", null, !(throwable != null ? !throwable.equals(var11_10) : var11_10 != null), Prettifier$.MODULE$.default());
                assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 157));
            }
            finally {
                threadPool.shutdown();
            }
            return assertion;
        }, new Position("KafkaDataConsumerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 113));
    }
}

