/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming;

import java.io.File;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rdd.RDD;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.BatchCounter;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.InputStreamsSuite;
import org.apache.spark.streaming.MultiThreadTestReceiver;
import org.apache.spark.streaming.MultiThreadTestReceiver$;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.TestOutputStream;
import org.apache.spark.streaming.TestServer;
import org.apache.spark.streaming.TestServer$;
import org.apache.spark.streaming.TestSuiteBase;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Args;
import org.scalatest.Assertions$;
import org.scalatest.BeforeAndAfter;
import org.scalatest.BeforeAndAfterAll;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.Status;
import org.scalatest.compatible.Assertion;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.enablers.Retrying$;
import org.sparkproject.guava.io.Files;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.Iterable;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.StringOps$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.math.Numeric;
import scala.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u000512Aa\u0001\u0003\u0001\u001b!)A\u0004\u0001C\u0001;!)q\u0004\u0001C\u0001A\t\t\u0012J\u001c9viN#(/Z1ngN+\u0018\u000e^3\u000b\u0005\u00151\u0011!C:ue\u0016\fW.\u001b8h\u0015\t9\u0001\"A\u0003ta\u0006\u00148N\u0003\u0002\n\u0015\u00051\u0011\r]1dQ\u0016T\u0011aC\u0001\u0004_J<7\u0001A\n\u0005\u00019\u0011b\u0003\u0005\u0002\u0010!5\ta!\u0003\u0002\u0012\r\ti1\u000b]1sW\u001a+hnU;ji\u0016\u0004\"a\u0005\u000b\u000e\u0003\u0011I!!\u0006\u0003\u0003\u001bQ+7\u000f^*vSR,')Y:f!\t9\"$D\u0001\u0019\u0015\tI\"\"A\u0005tG\u0006d\u0017\r^3ti&\u00111\u0004\u0007\u0002\u000f\u0005\u00164wN]3B]\u0012\fe\r^3s\u0003\u0019a\u0014N\\5u}Q\ta\u0004\u0005\u0002\u0014\u0001\u0005qA/Z:u\r&dWm\u0015;sK\u0006lGCA\u0011(!\t\u0011S%D\u0001$\u0015\u0005!\u0013!B:dC2\f\u0017B\u0001\u0014$\u0005\u0011)f.\u001b;\t\u000b!\u0012\u0001\u0019A\u0015\u0002\u00199,wOR5mKN|e\u000e\\=\u0011\u0005\tR\u0013BA\u0016$\u0005\u001d\u0011un\u001c7fC:\u0004")
public class InputStreamsSuite
extends SparkFunSuite
implements TestSuiteBase,
BeforeAndAfter {
    private AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    private AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    private volatile boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    private String checkpointDir;
    private SparkConf conf;
    private PatienceConfiguration.Timeout eventuallyTimeout;
    private volatile boolean bitmap$0;

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$runTest(String testName, Args args) {
        return BeforeAndAfterEach.runTest$((BeforeAndAfterEach)this, (String)testName, (Args)args);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$run(Option testName, Args args) {
        return BeforeAndAfterAll.run$((BeforeAndAfterAll)this, (Option)testName, (Args)args);
    }

    public void before(Function0<Object> fun, Position pos) {
        BeforeAndAfter.before$((BeforeAndAfter)this, fun, (Position)pos);
    }

    public void after(Function0<Object> fun, Position pos) {
        BeforeAndAfter.after$((BeforeAndAfter)this, fun, (Position)pos);
    }

    public Status runTest(String testName, Args args) {
        return BeforeAndAfter.runTest$((BeforeAndAfter)this, (String)testName, (Args)args);
    }

    public Status run(Option<String> testName, Args args) {
        return BeforeAndAfter.run$((BeforeAndAfter)this, testName, (Args)args);
    }

    @Override
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$beforeEach() {
        BeforeAndAfterEach.beforeEach$((BeforeAndAfterEach)this);
    }

    @Override
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$afterEach() {
        BeforeAndAfterEach.afterEach$((BeforeAndAfterEach)this);
    }

    @Override
    public String framework() {
        return TestSuiteBase.framework$(this);
    }

    @Override
    public String master() {
        return TestSuiteBase.master$(this);
    }

    @Override
    public Duration batchDuration() {
        return TestSuiteBase.batchDuration$(this);
    }

    @Override
    public int numInputPartitions() {
        return TestSuiteBase.numInputPartitions$(this);
    }

    @Override
    public int maxWaitTimeMillis() {
        return TestSuiteBase.maxWaitTimeMillis$(this);
    }

    @Override
    public boolean useManualClock() {
        return TestSuiteBase.useManualClock$(this);
    }

    @Override
    public boolean actuallyWait() {
        return TestSuiteBase.actuallyWait$(this);
    }

    @Override
    public void beforeFunction() {
        TestSuiteBase.beforeFunction$(this);
    }

    @Override
    public void afterFunction() {
        TestSuiteBase.afterFunction$(this);
    }

    @Override
    public void beforeEach() {
        TestSuiteBase.beforeEach$(this);
    }

    @Override
    public void afterEach() {
        TestSuiteBase.afterEach$(this);
    }

    @Override
    public <R> R withStreamingContext(StreamingContext ssc, Function1<StreamingContext, R> block) {
        return (R)TestSuiteBase.withStreamingContext$(this, ssc, block);
    }

    @Override
    public <R> R withTestServer(TestServer testServer, Function1<TestServer, R> block) {
        return (R)TestSuiteBase.withTestServer$(this, testServer, block);
    }

    @Override
    public <U, V> StreamingContext setupStreams(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, int numPartitions, ClassTag<U> evidence$4, ClassTag<V> evidence$5) {
        return TestSuiteBase.setupStreams$(this, input, operation, numPartitions, evidence$4, evidence$5);
    }

    @Override
    public <U, V> int setupStreams$default$3() {
        return TestSuiteBase.setupStreams$default$3$(this);
    }

    @Override
    public <U, V, W> StreamingContext setupStreams(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, ClassTag<U> evidence$6, ClassTag<V> evidence$7, ClassTag<W> evidence$8) {
        return TestSuiteBase.setupStreams$(this, input1, input2, operation, evidence$6, evidence$7, evidence$8);
    }

    @Override
    public <V> Seq<Seq<V>> runStreams(StreamingContext ssc, int numBatches, int numExpectedOutput, Function0<BoxedUnit> preStop, ClassTag<V> evidence$9) {
        return TestSuiteBase.runStreams$(this, ssc, numBatches, numExpectedOutput, preStop, evidence$9);
    }

    @Override
    public <V> Function0<BoxedUnit> runStreams$default$4() {
        return TestSuiteBase.runStreams$default$4$(this);
    }

    @Override
    public <V> Seq<Seq<Seq<V>>> runStreamsWithPartitions(StreamingContext ssc, int numBatches, int numExpectedOutput, Function0<BoxedUnit> preStop, ClassTag<V> evidence$10) {
        return TestSuiteBase.runStreamsWithPartitions$(this, ssc, numBatches, numExpectedOutput, preStop, evidence$10);
    }

    @Override
    public <V> Function0<BoxedUnit> runStreamsWithPartitions$default$4() {
        return TestSuiteBase.runStreamsWithPartitions$default$4$(this);
    }

    @Override
    public <V> void verifyOutput(Seq<Seq<V>> output, Seq<Seq<V>> expectedOutput, boolean useSet, ClassTag<V> evidence$11) {
        TestSuiteBase.verifyOutput$(this, output, expectedOutput, useSet, evidence$11);
    }

    @Override
    public <U, V> void testOperation(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, Seq<Seq<V>> expectedOutput, boolean useSet, ClassTag<U> evidence$12, ClassTag<V> evidence$13) {
        TestSuiteBase.testOperation$(this, input, operation, expectedOutput, useSet, evidence$12, evidence$13);
    }

    @Override
    public <U, V> boolean testOperation$default$4() {
        return TestSuiteBase.testOperation$default$4$(this);
    }

    @Override
    public <U, V> void testOperation(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, Seq<Seq<V>> expectedOutput, int numBatches, boolean useSet, ClassTag<U> evidence$14, ClassTag<V> evidence$15) {
        TestSuiteBase.testOperation$(this, input, operation, expectedOutput, numBatches, useSet, evidence$14, evidence$15);
    }

    @Override
    public <U, V, W> void testOperation(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, Seq<Seq<W>> expectedOutput, boolean useSet, ClassTag<U> evidence$16, ClassTag<V> evidence$17, ClassTag<W> evidence$18) {
        TestSuiteBase.testOperation$(this, input1, input2, operation, expectedOutput, useSet, evidence$16, evidence$17, evidence$18);
    }

    @Override
    public <U, V, W> void testOperation(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, Seq<Seq<W>> expectedOutput, int numBatches, boolean useSet, ClassTag<U> evidence$19, ClassTag<V> evidence$20, ClassTag<W> evidence$21) {
        TestSuiteBase.testOperation$(this, input1, input2, operation, expectedOutput, numBatches, useSet, evidence$19, evidence$20, evidence$21);
    }

    public final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    }

    public final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    }

    public boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked() {
        return this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    }

    public void org$scalatest$BeforeAndAfter$$runHasBeenInvoked_$eq(boolean x$1) {
        this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked = x$1;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$beforeFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> x$1) {
        this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic = x$1;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$afterFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> x$1) {
        this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic = x$1;
    }

    private String checkpointDir$lzycompute() {
        InputStreamsSuite inputStreamsSuite = this;
        synchronized (inputStreamsSuite) {
            if (!this.bitmap$0) {
                this.checkpointDir = TestSuiteBase.checkpointDir$(this);
                this.bitmap$0 = true;
            }
        }
        return this.checkpointDir;
    }

    @Override
    public String checkpointDir() {
        return !this.bitmap$0 ? this.checkpointDir$lzycompute() : this.checkpointDir;
    }

    @Override
    public SparkConf conf() {
        return this.conf;
    }

    @Override
    public PatienceConfiguration.Timeout eventuallyTimeout() {
        return this.eventuallyTimeout;
    }

    @Override
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$conf_$eq(SparkConf x$1) {
        this.conf = x$1;
    }

    @Override
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$eventuallyTimeout_$eq(PatienceConfiguration.Timeout x$1) {
        this.eventuallyTimeout = x$1;
    }

    public void testFileStream(boolean newFilesOnly) {
        this.withTempDir((Function1 & Serializable)testDir -> {
            InputStreamsSuite.$anonfun$testFileStream$1(this, newFilesOnly, testDir);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ String $anonfun$new$4(int x$1) {
        return Integer.toString(x$1);
    }

    public static final /* synthetic */ Assertion $anonfun$new$17(InputStreamsSuite $this, String[] output$1, Seq expectedOutput$1, int i) {
        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(output$1[i]);
        String $org_scalatest_assert_macro_right = (String)expectedOutput$1.apply(i);
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 90));
    }

    public static final /* synthetic */ void $anonfun$new$21(InputStreamsSuite $this, TestServer testServer$2, StreamingContext ssc) {
        BatchCounter batchCounter = new BatchCounter(ssc);
        ReceiverInputDStream networkStream = ssc.socketTextStream("localhost", testServer$2.port(), StorageLevel$.MODULE$.MEMORY_AND_DISK());
        ConcurrentLinkedQueue outputQueue = new ConcurrentLinkedQueue();
        TestOutputStream outputStream = new TestOutputStream(networkStream, outputQueue, ClassTag$.MODULE$.apply(String.class));
        outputStream.register();
        ssc.start();
        ManualClock clock = (ManualClock)ssc.scheduler().clock();
        clock.advance($this.batchDuration().milliseconds());
        if (!batchCounter.waitUntilBatchesCompleted(1, 30000L)) {
            throw $this.fail("Timeout: cannot finish all batches in 30 seconds", new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 120));
        }
        networkStream.generatedRDDs().foreach((Function1 & Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            RDD rdd = (RDD)tuple2._2();
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(rdd instanceof WriteAheadLogBackedBlockRDD, "rdd.isInstanceOf[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD[_]]", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertion assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 124));
            return assertion;
        });
    }

    public static final /* synthetic */ void $anonfun$new$20(InputStreamsSuite $this, TestServer testServer) {
        testServer.start();
        $this.withStreamingContext(new StreamingContext($this.conf(), $this.batchDuration()), (Function1 & Serializable)ssc -> {
            InputStreamsSuite.$anonfun$new$21($this, testServer, ssc);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ byte $anonfun$new$28(int i$1, byte b) {
        return (byte)(b + i$1);
    }

    public static final /* synthetic */ Assertion $anonfun$new$27(InputStreamsSuite $this, Duration batchDuration$1, File testDir$1, byte[] input$2, ManualClock clock$2, BatchCounter batchCounter$1, int i) {
        Thread.sleep(batchDuration$1.milliseconds());
        File file = new File(testDir$1, Integer.toString(i));
        Files.write((byte[])((byte[])ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.byteArrayOps(input$2), (Function1 & Serializable)b -> BoxesRunTime.boxToByte((byte)InputStreamsSuite.$anonfun$new$28(i, BoxesRunTime.unboxToByte((Object)b))), (ClassTag)ClassTag$.MODULE$.Byte())), (File)file);
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(file.setLastModified(clock$2.getTimeMillis()), "file.setLastModified(clock.getTimeMillis())", Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 160));
        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToLong((long)file.lastModified()));
        long $org_scalatest_assert_macro_right = clock$2.getTimeMillis();
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 161));
        $this.logInfo((Function0 & Serializable)() -> new StringBuilder(13).append("Created file ").append(file).toString());
        clock$2.advance(batchDuration$1.milliseconds());
        return (Assertion)Eventually$.MODULE$.eventually($this.eventuallyTimeout(), (Function0 & Serializable)() -> {
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToInteger((int)batchCounter$1.getNumCompletedBatches()));
            int $org_scalatest_assert_macro_right = i;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 167));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 166));
    }

    public static final /* synthetic */ byte $anonfun$new$34(int i$2, byte b) {
        return (byte)(b + i$2);
    }

    public static final /* synthetic */ Assertion $anonfun$new$33(InputStreamsSuite $this, Seq obtainedOutput$1, byte[] input$2, int i) {
        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(obtainedOutput$1.apply(i));
        byte[] $org_scalatest_assert_macro_right = (byte[])ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.byteArrayOps(input$2), (Function1 & Serializable)b -> BoxesRunTime.boxToByte((byte)InputStreamsSuite.$anonfun$new$34(i, BoxesRunTime.unboxToByte((Object)b))), (ClassTag)ClassTag$.MODULE$.Byte());
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 172));
    }

    public static final /* synthetic */ void $anonfun$new$26(InputStreamsSuite $this, File existingFile$1, Duration batchDuration$1, File testDir$1, StreamingContext ssc) {
        ManualClock clock = (ManualClock)ssc.scheduler().clock();
        clock.setTime(existingFile$1.lastModified() + batchDuration$1.milliseconds());
        BatchCounter batchCounter = new BatchCounter(ssc);
        DStream fileStream = ssc.binaryRecordsStream(testDir$1.toString(), 1);
        ConcurrentLinkedQueue outputQueue = new ConcurrentLinkedQueue();
        TestOutputStream outputStream = new TestOutputStream(fileStream, outputQueue, ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE)));
        outputStream.register();
        ssc.start();
        clock.advance(batchDuration$1.milliseconds() / 2L);
        int numCopies = 3;
        byte[] input = new byte[]{1, 2, 3, 4, 5};
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numCopies).foreach((Function1 & Serializable)i -> InputStreamsSuite.$anonfun$new$27($this, batchDuration$1, testDir$1, input, clock, batchCounter, BoxesRunTime.unboxToInt((Object)i)));
        Seq obtainedOutput = ((IterableOnceOps)((IterableOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(outputQueue).asScala()).map((Function1 & Serializable)x$2 -> (Seq)x$2.flatten((Function1 & Serializable)xs -> Predef$.MODULE$.wrapByteArray(xs)))).toSeq();
        obtainedOutput.indices().foreach((Function1 & Serializable)i -> InputStreamsSuite.$anonfun$new$33($this, obtainedOutput, input, BoxesRunTime.unboxToInt((Object)i)));
    }

    public static final /* synthetic */ void $anonfun$new$24(InputStreamsSuite $this, File testDir) {
        Bool bool;
        Duration batchDuration = Seconds$.MODULE$.apply(2L);
        File existingFile = new File(testDir, "0");
        Files.write((CharSequence)"0\n", (File)existingFile, (Charset)StandardCharsets.UTF_8);
        Bool $org_scalatest_assert_macro_left = Bool$.MODULE$.simpleMacroBool(existingFile.setLastModified(10000L), "existingFile.setLastModified(10000L)", Prettifier$.MODULE$.default());
        if ($org_scalatest_assert_macro_left.value()) {
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = $this.convertToEqualizer(BoxesRunTime.boxToLong((long)existingFile.lastModified()));
            int $org_scalatest_assert_macro_right = 10000;
            bool = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        } else {
            bool = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
        }
        Bool $org_scalatest_assert_macro_right = bool;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "&&", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$amp$amp((Function0 & Serializable)() -> $org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 136));
        $this.withStreamingContext(new StreamingContext($this.conf(), batchDuration), (Function1 & Serializable)ssc -> {
            InputStreamsSuite.$anonfun$new$26($this, existingFile, batchDuration, testDir, ssc);
            return BoxedUnit.UNIT;
        });
    }

    private final void createFileAndAdvanceTime$1(int data, File dir, File testSubDir1$1, ManualClock clock$3, Duration batchDuration$2, BatchCounter batchCounter$2) {
        File file = new File(testSubDir1$1, Integer.toString(data));
        Files.write((CharSequence)new StringBuilder(1).append(data).append("\n").toString(), (File)file, (Charset)StandardCharsets.UTF_8);
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(file.setLastModified(clock$3.getTimeMillis()), "file.setLastModified(clock.getTimeMillis())", Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 219));
        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToLong((long)file.lastModified()));
        long $org_scalatest_assert_macro_right = clock$3.getTimeMillis();
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 220));
        this.logInfo((Function0 & Serializable)() -> new StringBuilder(13).append("Created file ").append(file).toString());
        clock$3.advance(batchDuration$2.milliseconds());
        Eventually$.MODULE$.eventually(this.eventuallyTimeout(), (Function0 & Serializable)() -> {
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)batchCounter$2.getNumCompletedBatches()));
            int $org_scalatest_assert_macro_right = data;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 226));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 225));
    }

    public static final /* synthetic */ String $anonfun$new$46(int x$4) {
        return Integer.toString(x$4);
    }

    public static final /* synthetic */ void $anonfun$new$38(InputStreamsSuite $this, File testDir) {
        Bool bool;
        Duration batchDuration = Seconds$.MODULE$.apply(2L);
        File testSubDir1 = Utils$.MODULE$.createDirectory(testDir.toString(), "tmp1");
        File testSubDir2 = Utils$.MODULE$.createDirectory(testDir.toString(), "tmp2");
        File existingFile = new File(testDir, "0");
        Files.write((CharSequence)"0\n", (File)existingFile, (Charset)StandardCharsets.UTF_8);
        Bool $org_scalatest_assert_macro_left = Bool$.MODULE$.simpleMacroBool(existingFile.setLastModified(10000L), "existingFile.setLastModified(10000L)", Prettifier$.MODULE$.default());
        if ($org_scalatest_assert_macro_left.value()) {
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = $this.convertToEqualizer(BoxesRunTime.boxToLong((long)existingFile.lastModified()));
            int $org_scalatest_assert_macro_right = 10000;
            bool = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        } else {
            bool = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
        }
        Bool $org_scalatest_assert_macro_right = bool;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "&&", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$amp$amp((Function0 & Serializable)() -> $org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 195));
        String pathWithWildCard = new StringBuilder(3).append(testDir.toString()).append("/*/").toString();
        $this.withStreamingContext(new StreamingContext($this.conf(), batchDuration), (Function1 & Serializable)ssc -> {
            ManualClock clock = (ManualClock)ssc.scheduler().clock();
            clock.setTime(existingFile.lastModified() + batchDuration.milliseconds());
            BatchCounter batchCounter = new BatchCounter((StreamingContext)ssc);
            DStream fileStream = ssc.fileStream(pathWithWildCard, ClassTag$.MODULE$.apply(LongWritable.class), ClassTag$.MODULE$.apply(Text.class), ClassTag$.MODULE$.apply(TextInputFormat.class)).map((Function1 & Serializable)x$3 -> ((Text)x$3._2()).toString(), ClassTag$.MODULE$.apply(String.class));
            ConcurrentLinkedQueue outputQueue = new ConcurrentLinkedQueue();
            TestOutputStream outputStream = new TestOutputStream(fileStream, outputQueue, ClassTag$.MODULE$.apply(String.class));
            outputStream.register();
            ssc.start();
            clock.advance(batchDuration.milliseconds() / 2L);
            Seq input1 = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2, 3, 4, 5}));
            input1.foreach((Function1)(JFunction1.mcVI.sp & Serializable)i -> $this.createFileAndAdvanceTime$1(i, testSubDir1, testSubDir1, clock, batchDuration, batchCounter));
            Seq input2 = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{6, 7, 8, 9, 10}));
            input2.foreach((Function1)(JFunction1.mcVI.sp & Serializable)i -> $this.createFileAndAdvanceTime$1(i, testSubDir2, testSubDir1, clock, batchDuration, batchCounter));
            Set expectedOutput = ((IterableOnceOps)((IterableOps)input1.$plus$plus((IterableOnce)input2)).map((Function1 & Serializable)x$4 -> InputStreamsSuite.$anonfun$new$46(BoxesRunTime.unboxToInt((Object)x$4)))).toSet();
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(((IterableOnceOps)((IterableOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(outputQueue).asScala()).flatten(Predef$.MODULE$.$conforms())).toSet());
            Set $org_scalatest_assert_macro_right = expectedOutput;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 239));
        });
    }

    private static final void write$1(Path path, String text, FileSystem fs$1) {
        FSDataOutputStream out = fs$1.create(path, true);
        IOUtils.write((String)text, (OutputStream)out, (Charset)StandardCharsets.UTF_8);
        out.close();
    }

    public static final /* synthetic */ void $anonfun$new$48(InputStreamsSuite $this, File testDir) {
        Duration batchDuration = Seconds$.MODULE$.apply(2L);
        long durationMs = batchDuration.milliseconds();
        Path testPath = new Path(testDir.toURI());
        Path streamDir = new Path(testPath, "streaming");
        Path streamGlobPath = new Path(streamDir, "sub*");
        Path generatedDir = new Path(testPath, "generated");
        Path generatedSubDir = new Path(generatedDir, "subdir");
        Path renamedSubDir = new Path(streamDir, "subdir");
        $this.withStreamingContext(new StreamingContext($this.conf(), batchDuration), (Function1 & Serializable)ssc -> {
            SparkContext sparkContext = ssc.sparkContext();
            Configuration hc = sparkContext.hadoopConfiguration();
            FileSystem fs = FileSystem.get((URI)testPath.toUri(), (Configuration)hc);
            fs.delete(testPath, true);
            fs.mkdirs(testPath);
            fs.mkdirs(streamDir);
            fs.mkdirs(generatedSubDir);
            ManualClock clock = (ManualClock)ssc.scheduler().clock();
            Path existingFile = new Path(generatedSubDir, "existing");
            InputStreamsSuite.write$1(existingFile, "existing\n", fs);
            FileStatus status = fs.getFileStatus(existingFile);
            clock.setTime(status.getModificationTime() + durationMs);
            BatchCounter batchCounter = new BatchCounter((StreamingContext)ssc);
            DStream fileStream = ssc.textFileStream(streamGlobPath.toUri().toString());
            ConcurrentLinkedQueue outputQueue = new ConcurrentLinkedQueue();
            TestOutputStream outputStream = new TestOutputStream(fileStream, outputQueue, ClassTag$.MODULE$.apply(String.class));
            outputStream.register();
            ssc.start();
            clock.advance(durationMs);
            Eventually$.MODULE$.eventually($this.eventuallyTimeout(), (Function0 & Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToInteger((int)1));
                int $org_scalatest_assert_macro_right = batchCounter.getNumCompletedBatches();
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 285));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 284));
            Path textPath = new Path(generatedSubDir, "renamed.txt");
            InputStreamsSuite.write$1(textPath, "renamed\n", fs);
            long now = clock.getTimeMillis();
            fs.setTimes(textPath, now, now);
            FileStatus textFilestatus = fs.getFileStatus(existingFile);
            long $org_scalatest_assert_macro_left = textFilestatus.getModificationTime();
            long $org_scalatest_assert_macro_right = now + durationMs;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), "<", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left < $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 294));
            fs.rename(generatedSubDir, renamedSubDir);
            clock.advance(durationMs);
            Eventually$.MODULE$.eventually($this.eventuallyTimeout(), (Function0 & Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToInteger((int)2));
                int $org_scalatest_assert_macro_right = batchCounter.getNumCompletedBatches();
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 303));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 302));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = $this.convertToEqualizer(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"renamed"})));
            Set $org_scalatest_assert_macro_right2 = ((IterableOnceOps)((IterableOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(outputQueue).asScala()).flatten(Predef$.MODULE$.$conforms())).toSet();
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)$org_scalatest_assert_macro_right2, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 307));
        });
    }

    private static final Iterable output$2(ConcurrentLinkedQueue outputQueue$2) {
        return (Iterable)((IterableOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)outputQueue$2).asScala()).flatten(Predef$.MODULE$.$conforms());
    }

    public static final /* synthetic */ void $anonfun$new$53(InputStreamsSuite $this, MultiThreadTestReceiver testReceiver$1, ConcurrentLinkedQueue outputQueue$2, int numTotalRecords$1, StreamingContext ssc) {
        ReceiverInputDStream networkStream = ssc.receiverStream((Receiver)testReceiver$1, (ClassTag)ClassTag$.MODULE$.Int());
        DStream countStream = networkStream.count();
        TestOutputStream outputStream = new TestOutputStream(countStream, outputQueue$2, ClassTag$.MODULE$.Long());
        outputStream.register();
        ssc.start();
        ManualClock clock = (ManualClock)ssc.scheduler().clock();
        long startTimeNs = System.nanoTime();
        while (!(MultiThreadTestReceiver$.MODULE$.haveAllThreadsFinished() && BoxesRunTime.unboxToLong((Object)InputStreamsSuite.output$2(outputQueue$2).sum((Numeric)Numeric.LongIsIntegral$.MODULE$)) >= (long)numTotalRecords$1 || System.nanoTime() - startTimeNs >= TimeUnit.SECONDS.toNanos(5L))) {
            Thread.sleep(100L);
            clock.advance($this.batchDuration().milliseconds());
        }
        Thread.sleep(1000L);
    }

    private static final Iterable output$3(ConcurrentLinkedQueue outputQueue$3) {
        return (Iterable)((IterableOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)outputQueue$3).asScala()).filter((Function1 & Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)x$6.nonEmpty()));
    }

    public static final /* synthetic */ void $anonfun$new$63(InputStreamsSuite $this, ConcurrentLinkedQueue outputQueue$3, Seq input$3, StreamingContext ssc) {
        Queue queue = new Queue(Queue$.MODULE$.$lessinit$greater$default$1());
        InputDStream queueStream = ssc.queueStream(queue, true, ClassTag$.MODULE$.apply(String.class));
        TestOutputStream outputStream = new TestOutputStream(queueStream, outputQueue$3, ClassTag$.MODULE$.apply(String.class));
        outputStream.register();
        ssc.start();
        ManualClock clock = (ManualClock)ssc.scheduler().clock();
        Iterator inputIterator = input$3.iterator();
        input$3.indices().foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i2 -> {
            inputIterator.take(2).foreach((Function1 & Serializable)i -> {
                Queue queue;
                Queue queue2 = queue;
                synchronized (queue2) {
                    SparkContext qual$1 = ssc.sparkContext();
                    Seq x$1 = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{i}));
                    int x$2 = qual$1.makeRDD$default$2();
                    queue = (Queue)queue.$plus$eq((Object)qual$1.makeRDD(x$1, x$2, ClassTag$.MODULE$.apply(String.class)));
                }
                return queue;
            });
            clock.advance($this.batchDuration().milliseconds());
        });
        Thread.sleep(1000L);
    }

    private static final Iterable output$4(ConcurrentLinkedQueue outputQueue$4) {
        return (Iterable)((IterableOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)outputQueue$4).asScala()).filter((Function1 & Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)x$7.nonEmpty()));
    }

    public static final /* synthetic */ void $anonfun$new$79(InputStreamsSuite $this, ConcurrentLinkedQueue outputQueue$4, Seq input$4, StreamingContext ssc) {
        Queue queue = new Queue(Queue$.MODULE$.$lessinit$greater$default$1());
        InputDStream queueStream = ssc.queueStream(queue, false, ClassTag$.MODULE$.apply(String.class));
        TestOutputStream outputStream = new TestOutputStream(queueStream, outputQueue$4, ClassTag$.MODULE$.apply(String.class));
        outputStream.register();
        ssc.start();
        ManualClock clock = (ManualClock)ssc.scheduler().clock();
        Iterator inputIterator = input$4.iterator();
        inputIterator.take(3).foreach((Function1 & Serializable)i -> {
            Queue queue;
            Queue queue2 = queue;
            synchronized (queue2) {
                SparkContext qual$2 = ssc.sparkContext();
                Seq x$3 = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{i}));
                int x$4 = qual$2.makeRDD$default$2();
                queue = (Queue)queue.$plus$eq((Object)qual$2.makeRDD(x$3, x$4, ClassTag$.MODULE$.apply(String.class)));
            }
            return queue;
        });
        clock.advance($this.batchDuration().milliseconds());
        Thread.sleep(1000L);
        inputIterator.foreach((Function1 & Serializable)i -> {
            Queue queue;
            Queue queue2 = queue;
            synchronized (queue2) {
                SparkContext qual$3 = ssc.sparkContext();
                Seq x$5 = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{i}));
                int x$6 = qual$3.makeRDD$default$2();
                queue = (Queue)queue.$plus$eq((Object)qual$3.makeRDD(x$5, x$6, ClassTag$.MODULE$.apply(String.class)));
            }
            return queue;
        });
        clock.advance($this.batchDuration().milliseconds());
        Thread.sleep(1000L);
    }

    public static final /* synthetic */ int $anonfun$new$97(TestReceiverInputDStream.1 x$9) {
        public class Org_apache_spark_streaming_InputStreamsSuite$TestReceiverInputDStream$1
        extends ReceiverInputDStream<String> {
            public Receiver<String> getReceiver() {
                return null;
            }

            public Org_apache_spark_streaming_InputStreamsSuite$TestReceiverInputDStream$1(InputStreamsSuite $outer, StreamingContext ssc$4) {
                super(ssc$4, ClassTag$.MODULE$.apply(String.class));
            }
        }
        return x$9.id();
    }

    public static final /* synthetic */ boolean $anonfun$testFileStream$4(Path x) {
        return true;
    }

    public static final /* synthetic */ Assertion $anonfun$testFileStream$6(InputStreamsSuite $this, File testDir$2, ManualClock clock$5, Duration batchDuration$3, BatchCounter batchCounter$4, int i) {
        File file = new File(testDir$2, Integer.toString(i));
        Files.write((CharSequence)new StringBuilder(1).append(i).append("\n").toString(), (File)file, (Charset)StandardCharsets.UTF_8);
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(file.setLastModified(clock$5.getTimeMillis()), "file.setLastModified(clock.getTimeMillis())", Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 506));
        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToLong((long)file.lastModified()));
        long $org_scalatest_assert_macro_right = clock$5.getTimeMillis();
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 507));
        $this.logInfo((Function0 & Serializable)() -> new StringBuilder(13).append("Created file ").append(file).toString());
        clock$5.advance(batchDuration$3.milliseconds());
        return (Assertion)Eventually$.MODULE$.eventually($this.eventuallyTimeout(), (Function0 & Serializable)() -> {
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToInteger((int)batchCounter$4.getNumCompletedBatches()));
            int $org_scalatest_assert_macro_right = i;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 513));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 512));
    }

    public static final /* synthetic */ String $anonfun$testFileStream$9(int x$11) {
        return Integer.toString(x$11);
    }

    public static final /* synthetic */ String $anonfun$testFileStream$10(int x$12) {
        return Integer.toString(x$12);
    }

    public static final /* synthetic */ void $anonfun$testFileStream$1(InputStreamsSuite $this, boolean newFilesOnly$1, File testDir) {
        Bool bool;
        Duration batchDuration = Seconds$.MODULE$.apply(2L);
        File existingFile = new File(testDir, "0");
        Files.write((CharSequence)"0\n", (File)existingFile, (Charset)StandardCharsets.UTF_8);
        Bool $org_scalatest_assert_macro_left = Bool$.MODULE$.simpleMacroBool(existingFile.setLastModified(10000L), "existingFile.setLastModified(10000L)", Prettifier$.MODULE$.default());
        if ($org_scalatest_assert_macro_left.value()) {
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = $this.convertToEqualizer(BoxesRunTime.boxToLong((long)existingFile.lastModified()));
            int $org_scalatest_assert_macro_right = 10000;
            bool = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        } else {
            bool = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
        }
        Bool $org_scalatest_assert_macro_right = bool;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "&&", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$amp$amp((Function0 & Serializable)() -> $org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 482));
        $this.withStreamingContext(new StreamingContext($this.conf(), batchDuration), (Function1 & Serializable)ssc -> {
            ManualClock clock = (ManualClock)ssc.scheduler().clock();
            clock.setTime(existingFile.lastModified() + batchDuration.milliseconds());
            BatchCounter batchCounter = new BatchCounter((StreamingContext)ssc);
            DStream fileStream = ssc.fileStream(testDir.toString(), (Function1 & Serializable)x -> BoxesRunTime.boxToBoolean((boolean)InputStreamsSuite.$anonfun$testFileStream$4(x)), newFilesOnly$1, ClassTag$.MODULE$.apply(LongWritable.class), ClassTag$.MODULE$.apply(Text.class), ClassTag$.MODULE$.apply(TextInputFormat.class)).map((Function1 & Serializable)x$10 -> ((Text)x$10._2()).toString(), ClassTag$.MODULE$.apply(String.class));
            ConcurrentLinkedQueue outputQueue = new ConcurrentLinkedQueue();
            TestOutputStream outputStream = new TestOutputStream(fileStream, outputQueue, ClassTag$.MODULE$.apply(String.class));
            outputStream.register();
            ssc.start();
            clock.advance(batchDuration.milliseconds() / 2L);
            Seq input = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2, 3, 4, 5}));
            input.foreach((Function1 & Serializable)i -> InputStreamsSuite.$anonfun$testFileStream$6($this, testDir, clock, batchDuration, batchCounter, BoxesRunTime.unboxToInt((Object)i)));
            Set expectedOutput = newFilesOnly$1 ? ((IterableOnceOps)input.map((Function1 & Serializable)x$11 -> InputStreamsSuite.$anonfun$testFileStream$9(BoxesRunTime.unboxToInt((Object)x$11)))).toSet() : ((IterableOnceOps)((IterableOps)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0})).$plus$plus((IterableOnce)input)).map((Function1 & Serializable)x$12 -> InputStreamsSuite.$anonfun$testFileStream$10(BoxesRunTime.unboxToInt((Object)x$12)))).toSet();
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(((IterableOnceOps)((IterableOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(outputQueue).asScala()).flatten(Predef$.MODULE$.$conforms())).toSet());
            Set $org_scalatest_assert_macro_right = expectedOutput;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 523));
        });
    }

    public InputStreamsSuite() {
        TestSuiteBase.$init$(this);
        BeforeAndAfter.$init$((BeforeAndAfter)this);
        this.test("socket input stream", (Seq)Nil$.MODULE$, (Function0 & Serializable)() -> (Assertion)this.withTestServer(new TestServer(TestServer$.MODULE$.$lessinit$greater$default$1()), (Function1 & Serializable)testServer -> {
            testServer.start();
            return (Assertion)this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), (Function1 & Serializable)ssc -> {
                Seq input = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2, 3, 4, 5}));
                BatchCounter batchCounter = new BatchCounter((StreamingContext)ssc);
                ReceiverInputDStream networkStream = ssc.socketTextStream("localhost", testServer.port(), StorageLevel$.MODULE$.MEMORY_AND_DISK());
                ConcurrentLinkedQueue outputQueue = new ConcurrentLinkedQueue();
                TestOutputStream outputStream = new TestOutputStream(networkStream, outputQueue, ClassTag$.MODULE$.apply(String.class));
                outputStream.register();
                ssc.start();
                ManualClock clock = (ManualClock)ssc.scheduler().clock();
                Seq expectedOutput = (Seq)input.map((Function1 & Serializable)x$1 -> InputStreamsSuite.$anonfun$new$4(BoxesRunTime.unboxToInt((Object)x$1)));
                input.indices().foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> {
                    testServer.send(new StringBuilder(1).append(input.apply(i).toString()).append("\n").toString());
                    clock.advance(this.batchDuration().milliseconds());
                });
                Eventually$.MODULE$.eventually(this.eventuallyTimeout(), (Function0)(JFunction0.mcV.sp & Serializable)() -> {
                    clock.advance(this.batchDuration().milliseconds());
                    this.logInfo((Function0 & Serializable)() -> "--------------------------------");
                    this.logInfo((Function0 & Serializable)() -> new StringBuilder(14).append("output.size = ").append(outputQueue.size()).toString());
                    this.logInfo((Function0 & Serializable)() -> "output");
                    ((IterableOnceOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)outputQueue).asScala()).foreach((Function1 & Serializable)x -> {
                        this.logInfo((Function0 & Serializable)() -> new StringBuilder(2).append("[").append(x.mkString(",")).append("]").toString());
                        return BoxedUnit.UNIT;
                    });
                    this.logInfo((Function0 & Serializable)() -> new StringBuilder(23).append("expected output.size = ").append(expectedOutput.size()).toString());
                    this.logInfo((Function0 & Serializable)() -> "expected output");
                    expectedOutput.foreach((Function1 & Serializable)x -> {
                        this.logInfo((Function0 & Serializable)() -> new StringBuilder(2).append("[").append(StringOps$.MODULE$.mkString$extension(Predef$.MODULE$.augmentString(x), ",")).append("]").toString());
                        return BoxedUnit.UNIT;
                    });
                    this.logInfo((Function0 & Serializable)() -> "--------------------------------");
                    String[] output = (String[])((IterableOnceOps)((IterableOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)outputQueue).asScala()).flatten(Predef$.MODULE$.$conforms())).toArray(ClassTag$.MODULE$.apply(String.class));
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)output.length));
                    int $org_scalatest_assert_macro_right = expectedOutput.size();
                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                    Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 88));
                    ArrayOps$.MODULE$.indices$extension(Predef$.MODULE$.refArrayOps((Object[])output)).foreach((Function1 & Serializable)i -> InputStreamsSuite.$anonfun$new$17(this, output, expectedOutput, BoxesRunTime.unboxToInt((Object)i)));
                }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 73));
                return (Assertion)Eventually$.MODULE$.eventually(this.eventuallyTimeout(), (Function0 & Serializable)() -> {
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToLong((long)ssc.progressListener().numTotalReceivedRecords()));
                    int $org_scalatest_assert_macro_right = input.length();
                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                    Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 95));
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)ssc.progressListener().numTotalProcessedRecords()));
                    int $org_scalatest_assert_macro_right2 = input.length();
                    Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                    return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 96));
                }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 94));
            });
        }), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 48));
        this.test("socket input stream - no block in a batch", (Seq)Nil$.MODULE$, (Function0)(JFunction0.mcV.sp & Serializable)() -> this.withTestServer(new TestServer(TestServer$.MODULE$.$lessinit$greater$default$1()), (Function1 & Serializable)testServer -> {
            InputStreamsSuite.$anonfun$new$20(this, testServer);
            return BoxedUnit.UNIT;
        }), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 102));
        this.test("binary records stream", (Seq)Nil$.MODULE$, (Function0)(JFunction0.mcV.sp & Serializable)() -> this.withTempDir((Function1 & Serializable)testDir -> {
            InputStreamsSuite.$anonfun$new$24(this, testDir);
            return BoxedUnit.UNIT;
        }), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 130));
        this.test("file input stream - newFilesOnly = true", (Seq)Nil$.MODULE$, (Function0)(JFunction0.mcV.sp & Serializable)() -> this.testFileStream(true), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 178));
        this.test("file input stream - newFilesOnly = false", (Seq)Nil$.MODULE$, (Function0)(JFunction0.mcV.sp & Serializable)() -> this.testFileStream(false), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 182));
        this.test("file input stream - wildcard", (Seq)Nil$.MODULE$, (Function0)(JFunction0.mcV.sp & Serializable)() -> this.withTempDir((Function1 & Serializable)testDir -> {
            InputStreamsSuite.$anonfun$new$38(this, testDir);
            return BoxedUnit.UNIT;
        }), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 186));
        this.test("Modified files are correctly detected.", (Seq)Nil$.MODULE$, (Function0)(JFunction0.mcV.sp & Serializable)() -> this.withTempDir((Function1 & Serializable)testDir -> {
            InputStreamsSuite.$anonfun$new$48(this, testDir);
            return BoxedUnit.UNIT;
        }), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 244));
        this.test("multi-thread receiver", (Seq)Nil$.MODULE$, (Function0 & Serializable)() -> {
            int numThreads = 10;
            int numRecordsPerThread = 1000;
            int numTotalRecords = numThreads * numRecordsPerThread;
            MultiThreadTestReceiver testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread);
            MultiThreadTestReceiver$.MODULE$.haveAllThreadsFinished_$eq(false);
            ConcurrentLinkedQueue outputQueue = new ConcurrentLinkedQueue();
            this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), (Function1 & Serializable)ssc -> {
                InputStreamsSuite.$anonfun$new$53(this, testReceiver, outputQueue, numTotalRecords, ssc);
                return BoxedUnit.UNIT;
            });
            this.logInfo((Function0 & Serializable)() -> "--------------------------------");
            this.logInfo((Function0 & Serializable)() -> new StringBuilder(14).append("output.size = ").append(outputQueue.size()).toString());
            this.logInfo((Function0 & Serializable)() -> "output");
            ((IterableOnceOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(outputQueue).asScala()).foreach((Function1 & Serializable)x -> {
                this.logInfo((Function0 & Serializable)() -> new StringBuilder(2).append("[").append(x.mkString(",")).append("]").toString());
                return BoxedUnit.UNIT;
            });
            this.logInfo((Function0 & Serializable)() -> "--------------------------------");
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(InputStreamsSuite.output$2(outputQueue).sum((Numeric)Numeric.LongIsIntegral$.MODULE$));
            int $org_scalatest_assert_macro_right = numTotalRecords;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 348));
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 312));
        this.test("queue input stream - oneAtATime = true", (Seq)Nil$.MODULE$, (Function0)(JFunction0.mcV.sp & Serializable)() -> {
            Seq input = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"1", "2", "3", "4", "5"}));
            Seq expectedOutput = (Seq)input.map((Function1 & Serializable)x$5 -> (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{x$5})));
            ConcurrentLinkedQueue outputQueue = new ConcurrentLinkedQueue();
            this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), (Function1 & Serializable)ssc -> {
                InputStreamsSuite.$anonfun$new$63(this, outputQueue, input, ssc);
                return BoxedUnit.UNIT;
            });
            this.logInfo((Function0 & Serializable)() -> "--------------------------------");
            this.logInfo((Function0 & Serializable)() -> new StringBuilder(14).append("output.size = ").append(outputQueue.size()).toString());
            this.logInfo((Function0 & Serializable)() -> "output");
            ((IterableOnceOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(outputQueue).asScala()).foreach((Function1 & Serializable)x -> {
                this.logInfo((Function0 & Serializable)() -> new StringBuilder(2).append("[").append(x.mkString(",")).append("]").toString());
                return BoxedUnit.UNIT;
            });
            this.logInfo((Function0 & Serializable)() -> new StringBuilder(23).append("expected output.size = ").append(expectedOutput.size()).toString());
            this.logInfo((Function0 & Serializable)() -> "expected output");
            expectedOutput.foreach((Function1 & Serializable)x -> {
                this.logInfo((Function0 & Serializable)() -> new StringBuilder(2).append("[").append(x.mkString(",")).append("]").toString());
                return BoxedUnit.UNIT;
            });
            this.logInfo((Function0 & Serializable)() -> "--------------------------------");
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)InputStreamsSuite.output$3(outputQueue).size()));
            int $org_scalatest_assert_macro_right = expectedOutput.size();
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 392));
            ((IterableOnceOps)InputStreamsSuite.output$3(outputQueue).zipWithIndex()).foreach((Function1 & Serializable)x0$2 -> {
                Tuple2 tuple2 = x0$2;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                Seq e = (Seq)tuple2._1();
                int i = tuple2._2$mcI$sp();
                Seq $org_scalatest_assert_macro_left = e;
                Seq $org_scalatest_assert_macro_right = (Seq)expectedOutput.apply(i);
                Seq seq = $org_scalatest_assert_macro_left;
                Seq seq2 = $org_scalatest_assert_macro_right;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(seq != null ? !seq.equals(seq2) : seq2 != null), Prettifier$.MODULE$.default());
                Assertion assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 393));
                return assertion;
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 351));
        this.test("queue input stream - oneAtATime = false", (Seq)Nil$.MODULE$, (Function0)(JFunction0.mcV.sp & Serializable)() -> {
            ConcurrentLinkedQueue outputQueue = new ConcurrentLinkedQueue();
            Seq input = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"1", "2", "3", "4", "5"}));
            Seq expectedOutput = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Seq[]{(Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"1", "2", "3"})), (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"4", "5"}))}));
            this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), (Function1 & Serializable)ssc -> {
                InputStreamsSuite.$anonfun$new$79(this, outputQueue, input, ssc);
                return BoxedUnit.UNIT;
            });
            this.logInfo((Function0 & Serializable)() -> "--------------------------------");
            this.logInfo((Function0 & Serializable)() -> new StringBuilder(14).append("output.size = ").append(outputQueue.size()).toString());
            this.logInfo((Function0 & Serializable)() -> "output");
            ((IterableOnceOps)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(outputQueue).asScala()).foreach((Function1 & Serializable)x -> {
                this.logInfo((Function0 & Serializable)() -> new StringBuilder(2).append("[").append(x.mkString(",")).append("]").toString());
                return BoxedUnit.UNIT;
            });
            this.logInfo((Function0 & Serializable)() -> new StringBuilder(23).append("expected output.size = ").append(expectedOutput.size()).toString());
            this.logInfo((Function0 & Serializable)() -> "expected output");
            expectedOutput.foreach((Function1 & Serializable)x -> {
                this.logInfo((Function0 & Serializable)() -> new StringBuilder(2).append("[").append(x.mkString(",")).append("]").toString());
                return BoxedUnit.UNIT;
            });
            this.logInfo((Function0 & Serializable)() -> "--------------------------------");
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)InputStreamsSuite.output$4(outputQueue).size()));
            int $org_scalatest_assert_macro_right = expectedOutput.size();
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 444));
            ((IterableOnceOps)InputStreamsSuite.output$4(outputQueue).zipWithIndex()).foreach((Function1 & Serializable)x0$3 -> {
                Tuple2 tuple2 = x0$3;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                Seq e = (Seq)tuple2._1();
                int i = tuple2._2$mcI$sp();
                Seq $org_scalatest_assert_macro_left = e;
                Seq $org_scalatest_assert_macro_right = (Seq)expectedOutput.apply(i);
                Seq seq = $org_scalatest_assert_macro_left;
                Seq seq2 = $org_scalatest_assert_macro_right;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(seq != null ? !seq.equals(seq2) : seq2 != null), Prettifier$.MODULE$.default());
                Assertion assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 445));
                return assertion;
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 396));
        this.test("test track the number of input stream", (Seq)Nil$.MODULE$, (Function0 & Serializable)() -> (Assertion)this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), (Function1 & Serializable)ssc -> {
            Org_apache_spark_streaming_InputStreamsSuite$TestReceiverInputDStream$1[] receiverInputStreams = (Org_apache_spark_streaming_InputStreamsSuite$TestReceiverInputDStream$1[])((Object[])new Org_apache_spark_streaming_InputStreamsSuite$TestReceiverInputDStream$1[]{new Org_apache_spark_streaming_InputStreamsSuite$TestReceiverInputDStream$1(null, (StreamingContext)ssc), new Org_apache_spark_streaming_InputStreamsSuite$TestReceiverInputDStream$1(null, (StreamingContext)ssc)});
            public class Org_apache_spark_streaming_InputStreamsSuite$TestInputDStream$1
            extends InputDStream<String> {
                public void start() {
                }

                public void stop() {
                }

                public Option<RDD<String>> compute(Time validTime) {
                    return None$.MODULE$;
                }

                public Org_apache_spark_streaming_InputStreamsSuite$TestInputDStream$1(InputStreamsSuite $outer, StreamingContext ssc$4) {
                    super(ssc$4, ClassTag$.MODULE$.apply(String.class));
                }
            }
            Org_apache_spark_streaming_InputStreamsSuite$TestInputDStream$1[] inputStreams = (Org_apache_spark_streaming_InputStreamsSuite$TestInputDStream$1[])((Object[])new Org_apache_spark_streaming_InputStreamsSuite$TestInputDStream$1[]{new Org_apache_spark_streaming_InputStreamsSuite$TestInputDStream$1(null, (StreamingContext)ssc), new Org_apache_spark_streaming_InputStreamsSuite$TestInputDStream$1(null, (StreamingContext)ssc), new Org_apache_spark_streaming_InputStreamsSuite$TestInputDStream$1(null, (StreamingContext)ssc)});
            InputDStream[] $org_scalatest_assert_macro_left = ssc.graph().getInputStreams();
            int $org_scalatest_assert_macro_right = receiverInputStreams.length + inputStreams.length;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.lengthSizeMacroBool((Object)$org_scalatest_assert_macro_left, "length", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left.length), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 467));
            ReceiverInputDStream[] $org_scalatest_assert_macro_left2 = ssc.graph().getReceiverInputStreams();
            int $org_scalatest_assert_macro_right2 = receiverInputStreams.length;
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.lengthSizeMacroBool((Object)$org_scalatest_assert_macro_left2, "length", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left2.length), (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 469));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left3 = this.convertToEqualizer(ssc.graph().getReceiverInputStreams());
            Org_apache_spark_streaming_InputStreamsSuite$TestReceiverInputDStream$1[] $org_scalatest_assert_macro_right3 = receiverInputStreams;
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left3, "===", (Object)$org_scalatest_assert_macro_right3, $org_scalatest_assert_macro_left3.$eq$eq$eq((Object)$org_scalatest_assert_macro_right3, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 470));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left4 = this.convertToEqualizer(ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps((Object[])ssc.graph().getInputStreams()), (Function1 & Serializable)x$8 -> BoxesRunTime.boxToInteger((int)x$8.id()), (ClassTag)ClassTag$.MODULE$.Int()));
            int[] $org_scalatest_assert_macro_right4 = (int[])Array$.MODULE$.tabulate(5, (Function1)(JFunction1.mcII.sp & Serializable)i -> i, (ClassTag)ClassTag$.MODULE$.Int());
            Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left4, "===", (Object)$org_scalatest_assert_macro_right4, $org_scalatest_assert_macro_left4.$eq$eq$eq((Object)$org_scalatest_assert_macro_right4, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 471));
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left5 = this.convertToEqualizer(ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps((Object[])receiverInputStreams), (Function1 & Serializable)x$9 -> BoxesRunTime.boxToInteger((int)InputStreamsSuite.$anonfun$new$97(x$9)), (ClassTag)ClassTag$.MODULE$.Int()));
            int[] $org_scalatest_assert_macro_right5 = new int[]{0, 1};
            Bool $org_scalatest_assert_macro_expr5 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left5, "===", (Object)$org_scalatest_assert_macro_right5, $org_scalatest_assert_macro_left5.$eq$eq$eq((Object)$org_scalatest_assert_macro_right5, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr5, (Object)"", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 472));
        }), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 448));
        Statics.releaseFence();
    }
}

