/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.streaming.BatchCounter;
import org.apache.spark.streaming.Checkpoint$;
import org.apache.spark.streaming.DStreamCheckpointTester;
import org.apache.spark.streaming.DStreamCheckpointTester$;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Milliseconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.StreamingContext$;
import org.apache.spark.streaming.TestInputStream;
import org.apache.spark.streaming.TestOutputStreamWithPartitions;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.compatible.Assertion;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.time.SpanSugar$;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenIterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

public abstract class DStreamCheckpointTester$class {
    public static void testCheckpointedOperation(SparkFunSuite $this, Seq input, Function1 operation2, Seq expectedOutput, int numBatchesBeforeRestart, Duration batchDuration, boolean stopSparkContextAfterTest, ClassTag evidence$1, ClassTag evidence$2) {
        Predef$.MODULE$.require(numBatchesBeforeRestart < expectedOutput.size(), (Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Number of batches before context restart less than number of expected output (i.e. number of total batches to run)";
            }
        });
        Predef$.MODULE$.require(StreamingContext$.MODULE$.getActive().isEmpty(), (Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Cannot run test with already active streaming context";
            }
        });
        int totalNumBatches = input.size();
        long batchDurationMillis = batchDuration.milliseconds();
        String x$29 = $this.getClass().getSimpleName();
        String x$30 = Utils$.MODULE$.createTempDir$default$1();
        String checkpointDir2 = Utils$.MODULE$.createTempDir(x$30, x$29).toString();
        $this.logDebug((Function0)new Serializable($this, checkpointDir2){
            public static final long serialVersionUID = 0L;
            private final String checkpointDir$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Using checkpoint directory ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.checkpointDir$1}));
            }
            {
                this.checkpointDir$1 = checkpointDir$1;
            }
        });
        StreamingContext ssc = ((DStreamCheckpointTester)$this).createContextForCheckpointOperation(batchDuration);
        Predef$.MODULE$.require($this.convertToEqualizer((Object)ssc.conf().get("spark.streaming.clock")).$eq$eq$eq((Object)ManualClock.class.getName(), Equality$.MODULE$.default()), (Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Cannot run test without manual clock in the conf";
            }
        });
        TestInputStream inputStream = new TestInputStream(ssc, input, 2, evidence$1);
        DStream operatedStream = (DStream)operation2.apply(inputStream);
        operatedStream.print();
        TestOutputStreamWithPartitions outputStream = new TestOutputStreamWithPartitions(operatedStream, new ConcurrentLinkedQueue(), evidence$2);
        outputStream.register();
        ssc.checkpoint(checkpointDir2);
        Seq beforeRestartOutput = ((DStreamCheckpointTester)$this).generateOutput(ssc, new Time(batchDurationMillis * (long)numBatchesBeforeRestart), checkpointDir2, stopSparkContextAfterTest, evidence$2);
        DStreamCheckpointTester$class.assertOutput((SparkFunSuite)((DStreamCheckpointTester)$this), beforeRestartOutput, expectedOutput, true, evidence$2);
        $this.logInfo((Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "\n-------------------------------------------\n        Restarting stream computation          \n-------------------------------------------\n";
            }
        });
        StreamingContext restartedSsc = new StreamingContext(checkpointDir2);
        Seq afterRestartOutput = ((DStreamCheckpointTester)$this).generateOutput(restartedSsc, new Time(batchDurationMillis * (long)totalNumBatches), checkpointDir2, stopSparkContextAfterTest, evidence$2);
        DStreamCheckpointTester$class.assertOutput((SparkFunSuite)((DStreamCheckpointTester)$this), afterRestartOutput, expectedOutput, false, evidence$2);
    }

    public static Duration testCheckpointedOperation$default$5(SparkFunSuite $this) {
        return Milliseconds$.MODULE$.apply(500L);
    }

    public static boolean testCheckpointedOperation$default$6(SparkFunSuite $this) {
        return true;
    }

    public static StreamingContext createContextForCheckpointOperation(SparkFunSuite $this, Duration batchDuration) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName($this.getClass().getSimpleName());
        conf.set("spark.streaming.clock", ManualClock.class.getName());
        return new StreamingContext(SparkContext$.MODULE$.getOrCreate(conf), batchDuration);
    }

    public static TestOutputStreamWithPartitions getTestOutputStream(SparkFunSuite $this, DStream[] streams, ClassTag evidence$3) {
        return (TestOutputStreamWithPartitions)((Object)Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])streams).collect((PartialFunction)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final <A1 extends DStream<?>, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x1;
                if (A1 instanceof TestOutputStreamWithPartitions) {
                    TestOutputStreamWithPartitions testOutputStreamWithPartitions = (TestOutputStreamWithPartitions)A1;
                    object = testOutputStreamWithPartitions;
                } else {
                    object = function1.apply(x1);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(DStream<?> x1) {
                DStream<?> dStream = x1;
                boolean bl = dStream instanceof TestOutputStreamWithPartitions;
                return bl;
            }
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(TestOutputStreamWithPartitions.class)))).head());
    }

    public static Seq generateOutput(SparkFunSuite $this, StreamingContext ssc, Time targetBatchTime, String checkpointDir2, boolean stopSparkContext, ClassTag evidence$4) {
        try {
            BatchCounter batchCounter = new BatchCounter(ssc);
            ssc.start();
            ManualClock clock = (ManualClock)ssc.scheduler().clock();
            $this.logInfo((Function0)new Serializable($this, clock){
                public static final long serialVersionUID = 0L;
                private final ManualClock clock$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Manual clock before advancing = ").append((Object)BoxesRunTime.boxToLong((long)this.clock$1.getTimeMillis())).toString();
                }
                {
                    this.clock$1 = clock$1;
                }
            });
            clock.setTime(targetBatchTime.milliseconds());
            $this.logInfo((Function0)new Serializable($this, clock){
                public static final long serialVersionUID = 0L;
                private final ManualClock clock$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Manual clock after advancing = ").append((Object)BoxesRunTime.boxToLong((long)this.clock$1.getTimeMillis())).toString();
                }
                {
                    this.clock$1 = clock$1;
                }
            });
            TestOutputStreamWithPartitions outputStream = ((DStreamCheckpointTester)$this).getTestOutputStream(ssc.graph().getOutputStreams(), evidence$4);
            Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(10).seconds()), (Function0)new Serializable($this, batchCounter, ssc, targetBatchTime){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ SparkFunSuite $outer;
                private final BatchCounter batchCounter$1;
                private final StreamingContext ssc$2;
                private final Time targetBatchTime$1;

                public final Assertion apply() {
                    this.ssc$2.awaitTerminationOrTimeout(10L);
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.$outer.convertToEqualizer((Object)this.batchCounter$1.getLastCompletedBatchTime());
                    Time $org_scalatest_assert_macro_right = this.targetBatchTime$1;
                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                    return this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("CheckpointSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 167));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.batchCounter$1 = batchCounter$1;
                    this.ssc$2 = ssc$2;
                    this.targetBatchTime$1 = targetBatchTime$1;
                }
            }, Eventually$.MODULE$.patienceConfig(), new Position("CheckpointSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 165));
            Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(10).seconds()), (Function0)new Serializable($this, clock, checkpointDir2){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ SparkFunSuite $outer;
                public final ManualClock clock$1;
                private final String checkpointDir$2;

                public final Assertion apply() {
                    Seq checkpointFilesOfLatestTime = (Seq)Checkpoint$.MODULE$.getCheckpointFiles(this.checkpointDir$2, Checkpoint$.MODULE$.getCheckpointFiles$default$2()).filter((Function1)new Serializable(this){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ DStreamCheckpointTester$.anonfun.generateOutput.4 $outer;

                        public final boolean apply(Path x$1) {
                            return x$1.getName().contains(((Object)BoxesRunTime.boxToLong((long)this.$outer.clock$1.getTimeMillis())).toString());
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                        }
                    });
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.$outer.convertToEqualizer((Object)BoxesRunTime.boxToInteger((int)checkpointFilesOfLatestTime.size()));
                    int $org_scalatest_assert_macro_right = 2;
                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                    return this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("CheckpointSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 176));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.clock$1 = clock$1;
                    this.checkpointDir$2 = checkpointDir$2;
                }
            }, Eventually$.MODULE$.patienceConfig(), new Position("CheckpointSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 170));
            return ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(outputStream.output()).asScala()).map((Function1)new Serializable($this){
                public static final long serialVersionUID = 0L;

                public final Seq<V> apply(Seq<Seq<V>> x$2) {
                    return (Seq)x$2.flatten((Function1)Predef$.MODULE$.$conforms());
                }
            }, Iterable$.MODULE$.canBuildFrom())).toSeq();
        }
        finally {
            ssc.stop(stopSparkContext);
        }
    }

    private static void assertOutput(SparkFunSuite $this, Seq output2, Seq expectedOutput, boolean beforeRestart, ClassTag evidence$5) {
        Seq expectedPartialOutput = beforeRestart ? (Seq)expectedOutput.take(output2.size()) : (Seq)expectedOutput.takeRight(output2.size());
        boolean setComparison = ((IterableLike)output2.zip((GenIterable)expectedPartialOutput, Seq$.MODULE$.canBuildFrom())).forall((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SparkFunSuite $outer;

            public final boolean apply(Tuple2<Seq<Object>, Seq<Object>> x0$1) {
                Tuple2<Seq<Object>, Seq<Object>> tuple2 = x0$1;
                if (tuple2 != null) {
                    Seq o = (Seq)tuple2._1();
                    Seq e = (Seq)tuple2._2();
                    boolean bl = this.$outer.convertToEqualizer((Object)o.toSet()).$eq$eq$eq((Object)e.toSet(), Equality$.MODULE$.default());
                    return bl;
                }
                throw new MatchError(tuple2);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(setComparison, "setComparison", Prettifier$.MODULE$.default());
        $this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"set comparison failed\\n"})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Expected output items:\\n", "\\n"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{expectedPartialOutput.mkString("\n")}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Generated output items: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{output2.mkString("\n")}))).toString(), Prettifier$.MODULE$.default(), new Position("CheckpointSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 197));
    }

    public static void $init$(SparkFunSuite $this) {
    }
}

