/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.mllib.regression;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.mllib.linalg.Vectors$;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.regression.LinearRegressionModel;
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD;
import org.apache.spark.mllib.util.LinearDataGenerator$;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.TestServer;
import org.apache.spark.streaming.TestSuiteBase;
import org.apache.spark.streaming.dstream.DStream;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Args;
import org.scalatest.BeforeAndAfter;
import org.scalatest.BeforeAndAfterAll;
import org.scalatest.FunSuiteLike;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.concurrent.PatienceConfiguration;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenIterable;
import scala.collection.IndexedSeqOptimized;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.math.Numeric;
import scala.math.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001q3A!\u0003\u0006\u0001+!)\u0001\u0005\u0001C\u0001C!)A\u0005\u0001C!K!IA\u0006\u0001a\u0001\u0002\u0004%\t!\f\u0005\nc\u0001\u0001\r\u00111A\u0005\u0002IB\u0011\u0002\u000f\u0001A\u0002\u0003\u0005\u000b\u0015\u0002\u0018\t\u000be\u0002A\u0011\t\u001e\t\u000bm\u0002A\u0011\u0001\u001f\t\u000b\u0019\u0003A\u0011A$\u0003=M#(/Z1nS:<G*\u001b8fCJ\u0014Vm\u001a:fgNLwN\\*vSR,'BA\u0006\r\u0003)\u0011Xm\u001a:fgNLwN\u001c\u0006\u0003\u001b9\tQ!\u001c7mS\nT!a\u0004\t\u0002\u000bM\u0004\u0018M]6\u000b\u0005E\u0011\u0012AB1qC\u000eDWMC\u0001\u0014\u0003\ry'oZ\u0002\u0001'\r\u0001aC\u0007\t\u0003/ai\u0011AD\u0005\u000339\u0011Qb\u00159be.4UO\\*vSR,\u0007CA\u000e\u001f\u001b\u0005a\"BA\u000f\u000f\u0003%\u0019HO]3b[&tw-\u0003\u0002 9\tiA+Z:u'VLG/\u001a\"bg\u0016\fa\u0001P5oSRtD#\u0001\u0012\u0011\u0005\r\u0002Q\"\u0001\u0006\u0002#5\f\u0007pV1jiRKW.Z'jY2L7/F\u0001'!\t9#&D\u0001)\u0015\u0005I\u0013!B:dC2\f\u0017BA\u0016)\u0005\rIe\u000e^\u0001\u0004gN\u001cW#\u0001\u0018\u0011\u0005my\u0013B\u0001\u0019\u001d\u0005A\u0019FO]3b[&twmQ8oi\u0016DH/A\u0004tg\u000e|F%Z9\u0015\u0005M2\u0004CA\u00145\u0013\t)\u0004F\u0001\u0003V]&$\bbB\u001c\u0005\u0003\u0003\u0005\rAL\u0001\u0004q\u0012\n\u0014\u0001B:tG\u0002\nQ\"\u00194uKJ4UO\\2uS>tG#A\u001a\u0002\u0017\u0005\u001c8/\u001a:u\u000bF,\u0018\r\u001c\u000b\u0005gu\u0012E\tC\u0003?\u000f\u0001\u0007q(\u0001\u0002wcA\u0011q\u0005Q\u0005\u0003\u0003\"\u0012a\u0001R8vE2,\u0007\"B\"\b\u0001\u0004y\u0014A\u0001<3\u0011\u0015)u\u00011\u0001@\u0003\u001d)\u0007o]5m_:\f!C^1mS\u0012\fG/\u001a)sK\u0012L7\r^5p]R\u00191\u0007\u0013,\t\u000b%C\u0001\u0019\u0001&\u0002\u0017A\u0014X\rZ5di&|gn\u001d\t\u0004\u0017N{dB\u0001'R\u001d\ti\u0005+D\u0001O\u0015\tyE#\u0001\u0004=e>|GOP\u0005\u0002S%\u0011!\u000bK\u0001\ba\u0006\u001c7.Y4f\u0013\t!VKA\u0002TKFT!A\u0015\u0015\t\u000b]C\u0001\u0019\u0001-\u0002\u000b%t\u0007/\u001e;\u0011\u0007-\u001b\u0016\f\u0005\u0002$5&\u00111L\u0003\u0002\r\u0019\u0006\u0014W\r\\3e!>Lg\u000e\u001e")
public class StreamingLinearRegressionSuite
extends SparkFunSuite
implements TestSuiteBase {
    private StreamingContext ssc;
    private String checkpointDir;
    private final SparkConf conf;
    private final PatienceConfiguration.Timeout eventuallyTimeout;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    private volatile boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    private volatile boolean bitmap$0;

    public String framework() {
        return TestSuiteBase.framework$((TestSuiteBase)this);
    }

    public String master() {
        return TestSuiteBase.master$((TestSuiteBase)this);
    }

    public Duration batchDuration() {
        return TestSuiteBase.batchDuration$((TestSuiteBase)this);
    }

    public int numInputPartitions() {
        return TestSuiteBase.numInputPartitions$((TestSuiteBase)this);
    }

    public boolean useManualClock() {
        return TestSuiteBase.useManualClock$((TestSuiteBase)this);
    }

    public boolean actuallyWait() {
        return TestSuiteBase.actuallyWait$((TestSuiteBase)this);
    }

    public void beforeFunction() {
        TestSuiteBase.beforeFunction$((TestSuiteBase)this);
    }

    public <R> R withStreamingContext(StreamingContext ssc, Function1<StreamingContext, R> block) {
        return (R)TestSuiteBase.withStreamingContext$((TestSuiteBase)this, (StreamingContext)ssc, block);
    }

    public <R> R withTestServer(TestServer testServer, Function1<TestServer, R> block) {
        return (R)TestSuiteBase.withTestServer$((TestSuiteBase)this, (TestServer)testServer, block);
    }

    public <U, V> StreamingContext setupStreams(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, int numPartitions, ClassTag<U> evidence$4, ClassTag<V> evidence$5) {
        return TestSuiteBase.setupStreams$((TestSuiteBase)this, input, operation, (int)numPartitions, evidence$4, evidence$5);
    }

    public <U, V> int setupStreams$default$3() {
        return TestSuiteBase.setupStreams$default$3$((TestSuiteBase)this);
    }

    public <U, V, W> StreamingContext setupStreams(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, ClassTag<U> evidence$6, ClassTag<V> evidence$7, ClassTag<W> evidence$8) {
        return TestSuiteBase.setupStreams$((TestSuiteBase)this, input1, input2, operation, evidence$6, evidence$7, evidence$8);
    }

    public <V> Seq<Seq<V>> runStreams(StreamingContext ssc, int numBatches, int numExpectedOutput, Function0<BoxedUnit> preStop, ClassTag<V> evidence$9) {
        return TestSuiteBase.runStreams$((TestSuiteBase)this, (StreamingContext)ssc, (int)numBatches, (int)numExpectedOutput, preStop, evidence$9);
    }

    public <V> Function0<BoxedUnit> runStreams$default$4() {
        return TestSuiteBase.runStreams$default$4$((TestSuiteBase)this);
    }

    public <V> Seq<Seq<Seq<V>>> runStreamsWithPartitions(StreamingContext ssc, int numBatches, int numExpectedOutput, Function0<BoxedUnit> preStop, ClassTag<V> evidence$10) {
        return TestSuiteBase.runStreamsWithPartitions$((TestSuiteBase)this, (StreamingContext)ssc, (int)numBatches, (int)numExpectedOutput, preStop, evidence$10);
    }

    public <V> Function0<BoxedUnit> runStreamsWithPartitions$default$4() {
        return TestSuiteBase.runStreamsWithPartitions$default$4$((TestSuiteBase)this);
    }

    public <V> void verifyOutput(Seq<Seq<V>> output, Seq<Seq<V>> expectedOutput, boolean useSet, ClassTag<V> evidence$11) {
        TestSuiteBase.verifyOutput$((TestSuiteBase)this, output, expectedOutput, (boolean)useSet, evidence$11);
    }

    public <U, V> void testOperation(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, Seq<Seq<V>> expectedOutput, boolean useSet, ClassTag<U> evidence$12, ClassTag<V> evidence$13) {
        TestSuiteBase.testOperation$((TestSuiteBase)this, input, operation, expectedOutput, (boolean)useSet, evidence$12, evidence$13);
    }

    public <U, V> boolean testOperation$default$4() {
        return TestSuiteBase.testOperation$default$4$((TestSuiteBase)this);
    }

    public <U, V> void testOperation(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, Seq<Seq<V>> expectedOutput, int numBatches, boolean useSet, ClassTag<U> evidence$14, ClassTag<V> evidence$15) {
        TestSuiteBase.testOperation$((TestSuiteBase)this, input, operation, expectedOutput, (int)numBatches, (boolean)useSet, evidence$14, evidence$15);
    }

    public <U, V, W> void testOperation(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, Seq<Seq<W>> expectedOutput, boolean useSet, ClassTag<U> evidence$16, ClassTag<V> evidence$17, ClassTag<W> evidence$18) {
        TestSuiteBase.testOperation$((TestSuiteBase)this, input1, input2, operation, expectedOutput, (boolean)useSet, evidence$16, evidence$17, evidence$18);
    }

    public <U, V, W> void testOperation(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, Seq<Seq<W>> expectedOutput, int numBatches, boolean useSet, ClassTag<U> evidence$19, ClassTag<V> evidence$20, ClassTag<W> evidence$21) {
        TestSuiteBase.testOperation$((TestSuiteBase)this, input1, input2, operation, expectedOutput, (int)numBatches, (boolean)useSet, evidence$19, evidence$20, evidence$21);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$runTest(String testName, Args args) {
        return FunSuiteLike.runTest$((FunSuiteLike)this, (String)testName, (Args)args);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$run(Option testName, Args args) {
        return BeforeAndAfterAll.run$((BeforeAndAfterAll)this, (Option)testName, (Args)args);
    }

    public void before(Function0<Object> fun, Position pos) {
        BeforeAndAfter.before$((BeforeAndAfter)this, fun, (Position)pos);
    }

    public void after(Function0<Object> fun, Position pos) {
        BeforeAndAfter.after$((BeforeAndAfter)this, fun, (Position)pos);
    }

    public Status runTest(String testName, Args args) {
        return BeforeAndAfter.runTest$((BeforeAndAfter)this, (String)testName, (Args)args);
    }

    public Status run(Option<String> testName, Args args) {
        return BeforeAndAfter.run$((BeforeAndAfter)this, testName, (Args)args);
    }

    private String checkpointDir$lzycompute() {
        StreamingLinearRegressionSuite streamingLinearRegressionSuite = this;
        synchronized (streamingLinearRegressionSuite) {
            if (!this.bitmap$0) {
                this.checkpointDir = TestSuiteBase.checkpointDir$((TestSuiteBase)this);
                this.bitmap$0 = true;
            }
        }
        return this.checkpointDir;
    }

    public String checkpointDir() {
        return !this.bitmap$0 ? this.checkpointDir$lzycompute() : this.checkpointDir;
    }

    public SparkConf conf() {
        return this.conf;
    }

    public PatienceConfiguration.Timeout eventuallyTimeout() {
        return this.eventuallyTimeout;
    }

    public void org$apache$spark$streaming$TestSuiteBase$_setter_$conf_$eq(SparkConf x$1) {
        this.conf = x$1;
    }

    public void org$apache$spark$streaming$TestSuiteBase$_setter_$eventuallyTimeout_$eq(PatienceConfiguration.Timeout x$1) {
        this.eventuallyTimeout = x$1;
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    }

    public boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked() {
        return this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    }

    public void org$scalatest$BeforeAndAfter$$runHasBeenInvoked_$eq(boolean x$1) {
        this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked = x$1;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$beforeFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> x$1) {
        this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic = x$1;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$afterFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> x$1) {
        this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic = x$1;
    }

    public int maxWaitTimeMillis() {
        return 20000;
    }

    public StreamingContext ssc() {
        return this.ssc;
    }

    public void ssc_$eq(StreamingContext x$1) {
        this.ssc = x$1;
    }

    public void afterFunction() {
        block0: {
            TestSuiteBase.afterFunction$((TestSuiteBase)this);
            if (this.ssc() == null) break block0;
            StreamingContext qual$1 = this.ssc();
            boolean x$1 = qual$1.stop$default$1();
            qual$1.stop(x$1);
        }
    }

    public void assertEqual(double v1, double v2, double epsilon) {
        double $org_scalatest_assert_macro_left = package$.MODULE$.abs(v1 - v2);
        double $org_scalatest_assert_macro_right = epsilon;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToDouble((double)$org_scalatest_assert_macro_left), "<=", (Object)BoxesRunTime.boxToDouble((double)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left <= $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
        this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)StreamingLinearRegressionSuite.errorMessage$1(v1, v2), Prettifier$.MODULE$.default(), new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 45));
    }

    public void validatePrediction(Seq<Object> predictions, Seq<LabeledPoint> input) {
        int numOffPredictions;
        int $org_scalatest_assert_macro_left = numOffPredictions = ((TraversableOnce)predictions.zip(input, Seq$.MODULE$.canBuildFrom())).count((Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)StreamingLinearRegressionSuite.$anonfun$validatePrediction$1(x0$1)));
        int $org_scalatest_assert_macro_right = input.length() / 5;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), "<", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left < $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
        this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 55));
    }

    private static final String errorMessage$1(double v1$1, double v2$1) {
        return new StringBuilder(15).append(((Object)BoxesRunTime.boxToDouble((double)v1$1)).toString()).append(" did not equal ").append(((Object)BoxesRunTime.boxToDouble((double)v2$1)).toString()).toString();
    }

    public static final /* synthetic */ boolean $anonfun$validatePrediction$1(Tuple2 x0$1) {
        LabeledPoint expected;
        Tuple2 tuple2 = x0$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        double prediction = tuple2._1$mcD$sp();
        boolean bl = package$.MODULE$.abs(prediction - (expected = (LabeledPoint)tuple2._2()).label()) > 0.5;
        return bl;
    }

    public static final /* synthetic */ double $anonfun$new$4(StreamingLinearRegressionWithSGD model$1, LabeledPoint row) {
        return model$1.latestModel().predict(row.features());
    }

    public static final /* synthetic */ boolean $anonfun$new$9(Tuple2 x) {
        return x._1$mcD$sp() - x._2$mcD$sp() <= 0.1;
    }

    public static final /* synthetic */ int $anonfun$new$10(Tuple2 x) {
        return x._1$mcD$sp() - x._2$mcD$sp() < 0.0 ? 1 : 0;
    }

    public static final /* synthetic */ double $anonfun$new$15(int nPoints$1, Seq batch) {
        return BoxesRunTime.unboxToDouble((Object)((TraversableOnce)batch.map((Function1 & Serializable & scala.Serializable)p -> BoxesRunTime.boxToDouble((double)package$.MODULE$.abs(p._1$mcD$sp() - p._2$mcD$sp())), Seq$.MODULE$.canBuildFrom())).sum((Numeric)Numeric.DoubleIsFractional$.MODULE$)) / (double)nPoints$1;
    }

    public static final /* synthetic */ double $anonfun$new$22(int nPoints$2, Seq batch) {
        return BoxesRunTime.unboxToDouble((Object)((TraversableOnce)batch.map((Function1 & Serializable & scala.Serializable)p -> BoxesRunTime.boxToDouble((double)package$.MODULE$.abs(p._1$mcD$sp() - p._2$mcD$sp())), Seq$.MODULE$.canBuildFrom())).sum((Numeric)Numeric.DoubleIsFractional$.MODULE$)) / (double)nPoints$2;
    }

    public StreamingLinearRegressionSuite() {
        BeforeAndAfter.$init$((BeforeAndAfter)this);
        TestSuiteBase.$init$((TestSuiteBase)this);
        this.test("parameter accuracy", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors$.MODULE$.dense(0.0, (Seq)Predef$.MODULE$.wrapDoubleArray(new double[]{0.0}))).setStepSize(0.2).setNumIterations(25).setConvergenceTol(1.0E-4);
            int numBatches = 10;
            IndexedSeq input = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numBatches).map((Function1 & Serializable & scala.Serializable)i -> LinearDataGenerator$.MODULE$.generateLinearInput(0.0, new double[]{10.0, 10.0}, 100, 42 * (BoxesRunTime.unboxToInt((Object)i) + 1), LinearDataGenerator$.MODULE$.generateLinearInput$default$5()), IndexedSeq$.MODULE$.canBuildFrom());
            this.ssc_$eq(this.setupStreams((Seq)input, (Function1)(Function1 & Serializable & scala.Serializable)inputDStream -> {
                model.trainOn(inputDStream);
                return inputDStream.count();
            }, this.setupStreams$default$3(), (ClassTag)ClassTag$.MODULE$.apply(LabeledPoint.class), (ClassTag)ClassTag$.MODULE$.Long()));
            this.runStreams(this.ssc(), numBatches, numBatches, this.runStreams$default$4(), ClassTag$.MODULE$.Nothing());
            this.assertEqual(((LinearRegressionModel)model.latestModel()).intercept(), 0.0, 0.1);
            this.assertEqual(((LinearRegressionModel)model.latestModel()).weights().apply(0), 10.0, 0.1);
            this.assertEqual(((LinearRegressionModel)model.latestModel()).weights().apply(1), 10.0, 0.1);
            Seq validationData = LinearDataGenerator$.MODULE$.generateLinearInput(0.0, new double[]{10.0, 10.0}, 100, 17, LinearDataGenerator$.MODULE$.generateLinearInput$default$5());
            this.validatePrediction((Seq<Object>)((Seq)validationData.map((Function1 & Serializable & scala.Serializable)row -> BoxesRunTime.boxToDouble((double)StreamingLinearRegressionSuite.$anonfun$new$4(model, row)), Seq$.MODULE$.canBuildFrom())), (Seq<LabeledPoint>)validationData);
        }, new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 59));
        this.test("parameter convergence", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors$.MODULE$.dense(0.0, (Seq)Predef$.MODULE$.wrapDoubleArray(new double[0]))).setStepSize(0.2).setNumIterations(25);
            int numBatches = 10;
            IndexedSeq input = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numBatches).map((Function1 & Serializable & scala.Serializable)i -> LinearDataGenerator$.MODULE$.generateLinearInput(0.0, new double[]{10.0}, 100, 42 * (BoxesRunTime.unboxToInt((Object)i) + 1), LinearDataGenerator$.MODULE$.generateLinearInput$default$5()), IndexedSeq$.MODULE$.canBuildFrom());
            ArrayBuffer history = new ArrayBuffer(numBatches);
            this.ssc_$eq(this.setupStreams((Seq)input, (Function1)(Function1 & Serializable & scala.Serializable)inputDStream -> {
                model.trainOn(inputDStream);
                inputDStream.foreachRDD((Function1 & Serializable & scala.Serializable)x -> {
                    history.$plus$eq((Object)BoxesRunTime.boxToDouble((double)package$.MODULE$.abs(((LinearRegressionModel)model.latestModel()).weights().apply(0) - 10.0)));
                    return BoxedUnit.UNIT;
                });
                return inputDStream.count();
            }, this.setupStreams$default$3(), (ClassTag)ClassTag$.MODULE$.apply(LabeledPoint.class), (ClassTag)ClassTag$.MODULE$.Long()));
            this.runStreams(this.ssc(), numBatches, numBatches, this.runStreams$default$4(), ClassTag$.MODULE$.Nothing());
            ArrayBuffer deltas = (ArrayBuffer)((IndexedSeqOptimized)history.drop(1)).zip((GenIterable)history.dropRight(1), ArrayBuffer$.MODULE$.canBuildFrom());
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(deltas.forall((Function1 & Serializable & scala.Serializable)x -> BoxesRunTime.boxToBoolean((boolean)StreamingLinearRegressionSuite.$anonfun$new$9(x))), "deltas.forall(((x: (Double, Double)) => x._1.-(x._2).<=(0.1)))", Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 120));
            int $org_scalatest_assert_macro_left = BoxesRunTime.unboxToInt((Object)((TraversableOnce)deltas.map((Function1 & Serializable & scala.Serializable)x -> BoxesRunTime.boxToInteger((int)StreamingLinearRegressionSuite.$anonfun$new$10(x)), ArrayBuffer$.MODULE$.canBuildFrom())).sum((Numeric)Numeric.IntIsIntegral$.MODULE$));
            int $org_scalatest_assert_macro_right = 1;
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
            return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 122));
        }, new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 92));
        this.test("predictions", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors$.MODULE$.dense(10.0, (Seq)Predef$.MODULE$.wrapDoubleArray(new double[]{10.0}))).setStepSize(0.2).setNumIterations(25);
            int numBatches = 10;
            int nPoints = 100;
            IndexedSeq testInput = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numBatches).map((Function1 & Serializable & scala.Serializable)i -> LinearDataGenerator$.MODULE$.generateLinearInput(0.0, new double[]{10.0, 10.0}, nPoints, 42 * (BoxesRunTime.unboxToInt((Object)i) + 1), LinearDataGenerator$.MODULE$.generateLinearInput$default$5()), IndexedSeq$.MODULE$.canBuildFrom());
            this.ssc_$eq(this.setupStreams((Seq)testInput, (Function1)(Function1 & Serializable & scala.Serializable)inputDStream -> model.predictOnValues(inputDStream.map((Function1 & Serializable & scala.Serializable)x -> new Tuple2((Object)BoxesRunTime.boxToDouble((double)x.label()), (Object)x.features()), ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.Double()), this.setupStreams$default$3(), (ClassTag)ClassTag$.MODULE$.apply(LabeledPoint.class), (ClassTag)ClassTag$.MODULE$.apply(Tuple2.class)));
            Seq output = this.runStreams(this.ssc(), numBatches, numBatches, this.runStreams$default$4(), ClassTag$.MODULE$.Nothing());
            Seq errors = (Seq)output.map((Function1 & Serializable & scala.Serializable)batch -> BoxesRunTime.boxToDouble((double)StreamingLinearRegressionSuite.$anonfun$new$15(nPoints, batch)), Seq$.MODULE$.canBuildFrom());
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(errors.forall((Function1)(JFunction1.mcZD.sp & Serializable & scala.Serializable)x -> x <= 0.1), "errors.forall(((x: Double) => x.<=(0.1)))", Prettifier$.MODULE$.default());
            return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 149));
        }, new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 126));
        this.test("training and prediction", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors$.MODULE$.dense(0.0, (Seq)Predef$.MODULE$.wrapDoubleArray(new double[]{0.0}))).setStepSize(0.2).setNumIterations(25);
            int numBatches = 10;
            int nPoints = 100;
            IndexedSeq testInput = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numBatches).map((Function1 & Serializable & scala.Serializable)i -> LinearDataGenerator$.MODULE$.generateLinearInput(0.0, new double[]{10.0, 10.0}, nPoints, 42 * (BoxesRunTime.unboxToInt((Object)i) + 1), LinearDataGenerator$.MODULE$.generateLinearInput$default$5()), IndexedSeq$.MODULE$.canBuildFrom());
            this.ssc_$eq(this.setupStreams((Seq)testInput, (Function1)(Function1 & Serializable & scala.Serializable)inputDStream -> {
                model.trainOn(inputDStream);
                return model.predictOnValues(inputDStream.map((Function1 & Serializable & scala.Serializable)x -> new Tuple2((Object)BoxesRunTime.boxToDouble((double)x.label()), (Object)x.features()), ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.Double());
            }, this.setupStreams$default$3(), (ClassTag)ClassTag$.MODULE$.apply(LabeledPoint.class), (ClassTag)ClassTag$.MODULE$.apply(Tuple2.class)));
            Seq output = this.runStreams(this.ssc(), numBatches, numBatches, this.runStreams$default$4(), ClassTag$.MODULE$.Nothing());
            List error = ((TraversableOnce)output.map((Function1 & Serializable & scala.Serializable)batch -> BoxesRunTime.boxToDouble((double)StreamingLinearRegressionSuite.$anonfun$new$22(nPoints, batch)), Seq$.MODULE$.canBuildFrom())).toList();
            double $org_scalatest_assert_macro_left = BoxesRunTime.unboxToDouble((Object)error.head()) - BoxesRunTime.unboxToDouble((Object)error.last());
            int $org_scalatest_assert_macro_right = 2;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToDouble((double)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > (double)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
            return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 177));
        }, new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 153));
        this.test("handling empty RDDs in a stream", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors$.MODULE$.dense(0.0, (Seq)Predef$.MODULE$.wrapDoubleArray(new double[]{0.0}))).setStepSize(0.2).setNumIterations(25);
            int numBatches = 10;
            int nPoints = 100;
            Seq emptyInput = (Seq)Seq$.MODULE$.empty();
            this.ssc_$eq(this.setupStreams(emptyInput, (Function1 & Serializable & scala.Serializable)inputDStream -> {
                model.trainOn(inputDStream);
                return model.predictOnValues(inputDStream.map((Function1 & Serializable & scala.Serializable)x -> new Tuple2((Object)BoxesRunTime.boxToDouble((double)x.label()), (Object)x.features()), ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.Double());
            }, this.setupStreams$default$3(), ClassTag$.MODULE$.apply(LabeledPoint.class), ClassTag$.MODULE$.apply(Tuple2.class)));
            Seq output = this.runStreams(this.ssc(), numBatches, numBatches, this.runStreams$default$4(), ClassTag$.MODULE$.Nothing());
        }, new Position("StreamingLinearRegressionSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 181));
    }
}

