/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.connect.pipelines;

import java.io.Serializable;
import org.apache.spark.connect.proto.DatasetType;
import org.apache.spark.connect.proto.ExecutePlanResponse;
import org.apache.spark.connect.proto.PipelineCommand;
import org.apache.spark.connect.proto.PipelineEvent;
import org.apache.spark.connect.proto.Plan;
import org.apache.spark.connect.proto.SparkConnectServiceGrpc;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.connect.client.CloseableIterator;
import org.apache.spark.sql.connect.client.SparkConnectClient;
import org.apache.spark.sql.connect.pipelines.SparkDeclarativePipelinesServerTest;
import org.apache.spark.sql.connect.pipelines.TestPipelineDefinition;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.Tag;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.StringOps$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;
import scala.util.matching.Regex;

@ScalaSignature(bytes="\u0006\u0005a1AAA\u0002\u0001!!)Q\u0003\u0001C\u0001-\tA\u0002+\u001b9fY&tW-\u0012<f]R\u001cFO]3b[N+\u0018\u000e^3\u000b\u0005\u0011)\u0011!\u00039ja\u0016d\u0017N\\3t\u0015\t1q!A\u0004d_:tWm\u0019;\u000b\u0005!I\u0011aA:rY*\u0011!bC\u0001\u0006gB\f'o\u001b\u0006\u0003\u00195\ta!\u00199bG\",'\"\u0001\b\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001\t\u0002C\u0001\n\u0014\u001b\u0005\u0019\u0011B\u0001\u000b\u0004\u0005\r\u001a\u0006/\u0019:l\t\u0016\u001cG.\u0019:bi&4X\rU5qK2Lg.Z:TKJ4XM\u001d+fgR\fa\u0001P5oSRtD#A\f\u0011\u0005I\u0001\u0001")
public class PipelineEventStreamSuite
extends SparkDeclarativePipelinesServerTest {
    public static final /* synthetic */ boolean $anonfun$new$5(String eventMessage$1, PipelineEvent e) {
        return e.getMessage().contains(eventMessage$1);
    }

    public static final /* synthetic */ void $anonfun$new$3(PipelineEventStreamSuite $this, String graphId$1, ArrayBuffer capturedEvents$1, SparkConnectClient client) {
        Plan startRunRequest = $this.buildStartRunPlan(PipelineCommand.StartRun.newBuilder().setDataflowGraphId(graphId$1).build());
        CloseableIterator responseIterator = client.execute(startRunRequest, client.execute$default$2());
        while (responseIterator.hasNext()) {
            ExecutePlanResponse response = (ExecutePlanResponse)responseIterator.next();
            Object object = response.hasPipelineEventResult() ? capturedEvents$1.append((Object)response.getPipelineEventResult().getEvent()) : BoxedUnit.UNIT;
        }
        Set expectedEventMessages = (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Flow spark_catalog.default.a is QUEUED", "Flow spark_catalog.default.b is QUEUED", "Flow spark_catalog.default.a is PLANNING", "Flow spark_catalog.default.a is STARTING", "Flow spark_catalog.default.a is RUNNING", "Flow spark_catalog.default.a has COMPLETED", "Flow spark_catalog.default.b is STARTING", "Flow spark_catalog.default.b is RUNNING", "Flow spark_catalog.default.b has COMPLETED", "Run is COMPLETED"}));
        expectedEventMessages.foreach((Function1 & Serializable)eventMessage -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(capturedEvents$1.exists((Function1 & Serializable)e -> BoxesRunTime.boxToBoolean((boolean)PipelineEventStreamSuite.$anonfun$new$5(eventMessage, e))), "capturedEvents.exists(((e: org.apache.spark.connect.proto.PipelineEvent) => e.getMessage().contains(eventMessage)))", Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)("Did not receive expected event: " + eventMessage), Prettifier$.MODULE$.default(), new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 65));
        });
    }

    public static final /* synthetic */ void $anonfun$new$2(PipelineEventStreamSuite $this, SparkConnectServiceGrpc.SparkConnectServiceBlockingStub stub) {
        String graphId = $this.createDataflowGraph(stub);
        TestPipelineDefinition pipeline = new TestPipelineDefinition(null, graphId){
            {
                this.createTable("a", DatasetType.MATERIALIZED_VIEW, (Option<String>)new Some((Object)"SELECT * FROM RANGE(5)"));
                this.createTable("b", DatasetType.TABLE, (Option<String>)new Some((Object)"SELECT * FROM STREAM a"));
            }
        };
        $this.registerPipelineDatasets(pipeline, stub);
        ArrayBuffer capturedEvents = new ArrayBuffer();
        $this.withClient((Function1<SparkConnectClient, BoxedUnit>)(Function1 & Serializable)client -> {
            PipelineEventStreamSuite.$anonfun$new$3($this, graphId, capturedEvents, client);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$new$13(Regex runFailureErrorMsg$1, PipelineEvent e) {
        return !runFailureErrorMsg$1.matches((CharSequence)e.getMessage());
    }

    public static final /* synthetic */ void $anonfun$new$9(PipelineEventStreamSuite $this, String graphId$2, boolean dry$1, ArrayBuffer capturedEvents$2, SparkConnectClient client) {
        Plan startRunRequest = $this.buildStartRunPlan(PipelineCommand.StartRun.newBuilder().setDataflowGraphId(graphId$2).setDry(dry$1).build());
        AnalysisException ex = (AnalysisException)$this.intercept((Function0)(JFunction0.mcV.sp & Serializable)() -> {
            CloseableIterator responseIterator = client.execute(startRunRequest, client.execute$default$2());
            while (responseIterator.hasNext()) {
                ExecutePlanResponse response = (ExecutePlanResponse)responseIterator.next();
                Object object = response.hasPipelineEventResult() ? capturedEvents$2.append((Object)response.getPipelineEventResult().getEvent()) : BoxedUnit.UNIT;
            }
        }, ClassTag$.MODULE$.apply(AnalysisException.class), new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 99));
        Regex runFailureErrorMsg = StringOps$.MODULE$.r$extension(Predef$.MODULE$.augmentString("(?s).*Failed to resolve flows in the pipeline.*"));
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(runFailureErrorMsg.matches((CharSequence)ex.getMessage()), "runFailureErrorMsg.matches(ex.getMessage())", Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 110));
        Set expectedLogPatterns = (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Regex[]{StringOps$.MODULE$.r$extension(Predef$.MODULE$.augmentString("(?s).*Failed to resolve flow.*Failed to read dataset 'spark_catalog.default.a'.*")), StringOps$.MODULE$.r$extension(Predef$.MODULE$.augmentString("(?s).*Failed to resolve flow.*[TABLE_OR_VIEW_NOT_FOUND].*"))}));
        expectedLogPatterns.foreach((Function1 & Serializable)logPattern -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(capturedEvents$2.exists((Function1 & Serializable)e -> BoxesRunTime.boxToBoolean((boolean)logPattern.matches((CharSequence)e.getMessage()))), "capturedEvents.exists(((e: org.apache.spark.connect.proto.PipelineEvent) => logPattern.matches(e.getMessage())))", Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)("Did not receive expected event matching pattern: " + logPattern), Prettifier$.MODULE$.default(), new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 115));
        });
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(capturedEvents$2.forall((Function1 & Serializable)e -> BoxesRunTime.boxToBoolean((boolean)PipelineEventStreamSuite.$anonfun$new$13(runFailureErrorMsg, e))), "capturedEvents.forall(((e: org.apache.spark.connect.proto.PipelineEvent) => runFailureErrorMsg.matches(e.getMessage()).unary_!))", Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 120));
    }

    public static final /* synthetic */ void $anonfun$new$8(PipelineEventStreamSuite $this, boolean dry$1, SparkConnectServiceGrpc.SparkConnectServiceBlockingStub stub) {
        String graphId = $this.createDataflowGraph(stub);
        TestPipelineDefinition pipeline = new TestPipelineDefinition(null, graphId){
            {
                this.createTable("a", DatasetType.MATERIALIZED_VIEW, (Option<String>)new Some((Object)"SELECT * FROM unknown_table"));
                this.createTable("b", DatasetType.TABLE, (Option<String>)new Some((Object)"SELECT * FROM STREAM a"));
            }
        };
        $this.registerPipelineDatasets(pipeline, stub);
        ArrayBuffer capturedEvents = new ArrayBuffer();
        $this.withClient((Function1<SparkConnectClient, BoxedUnit>)(Function1 & Serializable)client -> {
            PipelineEventStreamSuite.$anonfun$new$9($this, graphId, dry$1, capturedEvents, client);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$new$18(String eventMessage$2, PipelineEvent e) {
        return e.getMessage().contains(eventMessage$2);
    }

    public static final /* synthetic */ void $anonfun$new$16(PipelineEventStreamSuite $this, String graphId$3, ArrayBuffer capturedEvents$3, SparkConnectClient client) {
        Plan startRunRequest = $this.buildStartRunPlan(PipelineCommand.StartRun.newBuilder().setDataflowGraphId(graphId$3).setDry(true).build());
        CloseableIterator responseIterator = client.execute(startRunRequest, client.execute$default$2());
        while (responseIterator.hasNext()) {
            ExecutePlanResponse response = (ExecutePlanResponse)responseIterator.next();
            Object object = response.hasPipelineEventResult() ? capturedEvents$3.append((Object)response.getPipelineEventResult().getEvent()) : BoxedUnit.UNIT;
        }
        Set expectedEventMessages = (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Run is COMPLETED"}));
        expectedEventMessages.foreach((Function1 & Serializable)eventMessage -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(capturedEvents$3.exists((Function1 & Serializable)e -> BoxesRunTime.boxToBoolean((boolean)PipelineEventStreamSuite.$anonfun$new$18(eventMessage, e))), "capturedEvents.exists(((e: org.apache.spark.connect.proto.PipelineEvent) => e.getMessage().contains(eventMessage)))", Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)("Did not receive expected event: " + eventMessage), Prettifier$.MODULE$.default(), new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 158));
        });
    }

    public static final /* synthetic */ void $anonfun$new$15(PipelineEventStreamSuite $this, SparkConnectServiceGrpc.SparkConnectServiceBlockingStub stub) {
        String graphId = $this.createDataflowGraph(stub);
        TestPipelineDefinition pipeline = new TestPipelineDefinition(null, graphId){
            {
                this.createTable("a", DatasetType.MATERIALIZED_VIEW, (Option<String>)new Some((Object)"SELECT * FROM RANGE(5)"));
                this.createTable("b", DatasetType.TABLE, (Option<String>)new Some((Object)"SELECT * FROM STREAM a"));
            }
        };
        $this.registerPipelineDatasets(pipeline, stub);
        ArrayBuffer capturedEvents = new ArrayBuffer();
        $this.withClient((Function1<SparkConnectClient, BoxedUnit>)(Function1 & Serializable)client -> {
            PipelineEventStreamSuite.$anonfun$new$16($this, graphId, capturedEvents, client);
            return BoxedUnit.UNIT;
        });
        capturedEvents.foreach((Function1 & Serializable)event -> {
            String $org_scalatest_assert_macro_left = event.getMessage();
            String $org_scalatest_assert_macro_right = "is QUEUED";
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 166));
        });
    }

    public PipelineEventStreamSuite() {
        this.test("expected events are streamed back to the client", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> this.withRawBlockingStub((Function1<SparkConnectServiceGrpc.SparkConnectServiceBlockingStub, BoxedUnit>)(Function1 & Serializable)stub -> {
            PipelineEventStreamSuite.$anonfun$new$2(this, stub);
            return BoxedUnit.UNIT;
        }), new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 27));
        this.test("flow resolution failure", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> {
            Seq dryOptions = (Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapBooleanArray(new boolean[]{true, false}));
            dryOptions.foreach((Function1 & Serializable)dry -> {
                this.withRawBlockingStub((Function1<SparkConnectServiceGrpc.SparkConnectServiceBlockingStub, BoxedUnit>)(Function1 & Serializable)stub -> {
                    PipelineEventStreamSuite.$anonfun$new$8($this, dry, stub);
                    return BoxedUnit.UNIT;
                });
                return BoxedUnit.UNIT;
            });
        }, new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 73));
        this.test("successful dry run", (Seq<Tag>)Nil$.MODULE$, (Function0<Object>)(JFunction0.mcV.sp & Serializable)() -> this.withRawBlockingStub((Function1<SparkConnectServiceGrpc.SparkConnectServiceBlockingStub, BoxedUnit>)(Function1 & Serializable)stub -> {
            PipelineEventStreamSuite.$anonfun$new$15(this, stub);
            return BoxedUnit.UNIT;
        }), new Position("PipelineEventStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 126));
    }
}

