/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.connect.test;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.spark.SparkBuildInfo$;
import org.apache.spark.sql.connect.SparkSession;
import org.apache.spark.sql.connect.SparkSession$;
import org.apache.spark.sql.connect.client.RetryPolicy;
import org.apache.spark.sql.connect.client.RetryPolicy$;
import org.apache.spark.sql.connect.client.SparkConnectClient$;
import org.apache.spark.sql.connect.common.config.ConnectCommon$;
import org.apache.spark.sql.connect.test.IntegrationTestUtils$;
import org.apache.spark.util.ArrayImplicits$;
import org.scalactic.source.Position;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.Futures$;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.SpanSugar$;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.ArrayOps$;
import scala.collection.IterableOnceOps;
import scala.collection.StringOps$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Builder;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.FiniteDuration$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.Random$;

public final class SparkConnectServerUtils$ {
    public static final SparkConnectServerUtils$ MODULE$ = new SparkConnectServerUtils$();
    private static Process sparkConnect;
    private static final int port;
    private static volatile boolean stopped;
    private static OutputStream consoleOut;
    private static final String serverStopCommand;
    private static volatile boolean bitmap$0;

    static {
        port = ConnectCommon$.MODULE$.CONNECT_GRPC_BINDING_PORT() + Random$.MODULE$.nextInt(1000);
        stopped = false;
        serverStopCommand = "q";
    }

    public int port() {
        return port;
    }

    private boolean stopped() {
        return stopped;
    }

    private void stopped_$eq(boolean x$1) {
        stopped = x$1;
    }

    private OutputStream consoleOut() {
        return consoleOut;
    }

    private void consoleOut_$eq(OutputStream x$1) {
        consoleOut = x$1;
    }

    private String serverStopCommand() {
        return serverStopCommand;
    }

    private Process sparkConnect$lzycompute() {
        SparkConnectServerUtils$ sparkConnectServerUtils$ = this;
        synchronized (sparkConnectServerUtils$) {
            if (!bitmap$0) {
                IntegrationTestUtils$.MODULE$.debug("Starting the Spark Connect Server...");
                String connectJar = IntegrationTestUtils$.MODULE$.findJar("sql/connect/server", "spark-connect-assembly", "spark-connect", IntegrationTestUtils$.MODULE$.findJar$default$4()).getCanonicalPath();
                String catalystTestJar = IntegrationTestUtils$.MODULE$.findJar("sql/catalyst", "spark-catalyst", "spark-catalyst", true).getCanonicalPath();
                Builder command = package$.MODULE$.Seq().newBuilder();
                command.$plus$eq((Object)"bin/spark-submit");
                command.$plus$eq((Object)"--driver-class-path").$plus$eq((Object)connectJar);
                command.$plus$eq((Object)"--class").$plus$eq((Object)"org.apache.spark.sql.connect.SimpleSparkConnectService");
                command.$plus$eq((Object)"--jars").$plus$eq((Object)catalystTestJar);
                command.$plus$eq((Object)"--conf").$plus$eq((Object)("spark.connect.grpc.binding.port=" + this.port()));
                command.$plus$plus$eq(this.testConfigs());
                command.$plus$plus$eq(IntegrationTestUtils$.MODULE$.debugConfigs());
                command.$plus$eq((Object)connectJar);
                ProcessBuilder builder = new ProcessBuilder((String[])((IterableOnceOps)command.result()).toArray(ClassTag$.MODULE$.apply(String.class)));
                builder.directory(new File(IntegrationTestUtils$.MODULE$.sparkHome()));
                Map<String, String> environment = builder.environment();
                environment.remove("SPARK_DIST_CLASSPATH");
                if (IntegrationTestUtils$.MODULE$.isDebug()) {
                    builder.redirectError(ProcessBuilder.Redirect.INHERIT);
                    v0 = builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
                } else {
                    v0 = BoxedUnit.UNIT;
                }
                Process process = builder.start();
                this.consoleOut_$eq(process.getOutputStream());
                scala.sys.package$.MODULE$.addShutdownHook((Function0)(JFunction0.mcV.sp & Serializable)() -> MODULE$.stop());
                sparkConnect = process;
                bitmap$0 = true;
            }
        }
        return sparkConnect;
    }

    private Process sparkConnect() {
        if (!bitmap$0) {
            return this.sparkConnect$lzycompute();
        }
        return sparkConnect;
    }

    private Seq<String> testConfigs() {
        String string;
        if (IntegrationTestUtils$.MODULE$.isSparkHiveJarAvailable()) {
            string = "hive";
        } else {
            Predef$.MODULE$.println((Object)"Will start Spark Connect server with `spark.sql.catalogImplementation=in-memory`, some tests that rely on Hive will be ignored. If you don't want to skip them:\n1. Test with maven: run `build/mvn install -DskipTests -Phive` before testing\n2. Test with sbt: run test with `-Phive` profile");
            IntegrationTestUtils$.MODULE$.cleanUpHiveClassesDirIfNeeded();
            string = "in-memory";
        }
        String catalogImplementation = string;
        return (Seq)new .colon.colon((Object)"spark.sql.catalog.testcat=org.apache.spark.sql.connector.catalog.InMemoryTableCatalog", (List)new .colon.colon((Object)("spark.sql.catalogImplementation=" + catalogImplementation), (List)new .colon.colon((Object)"spark.connect.execute.reattachable.senderMaxStreamDuration=1s", (List)new .colon.colon((Object)"spark.connect.execute.reattachable.senderMaxStreamSize=123", (List)new .colon.colon((Object)("spark.connect.grpc.arrow.maxBatchSize=" + 0xA00000), (List)new .colon.colon((Object)"spark.ui.enabled=false", (List)Nil$.MODULE$)))))).flatMap((Function1 & Serializable)v -> {
            String string = v;
            return Nil$.MODULE$.$colon$colon((Object)string).$colon$colon((Object)"--conf");
        });
    }

    public void start() {
        Predef$.MODULE$.assert(!this.stopped());
        this.sparkConnect();
    }

    /*
     * WARNING - void declaration
     */
    public int stop() {
        int n;
        this.stopped_$eq(true);
        IntegrationTestUtils$.MODULE$.debug("Stopping the Spark Connect Server...");
        try {
            void var2_1;
            this.consoleOut().write(this.serverStopCommand().getBytes());
            this.consoleOut().flush();
            this.consoleOut().close();
            Object object = !this.sparkConnect().waitFor(2L, TimeUnit.SECONDS) ? BoxesRunTime.boxToBoolean((boolean)this.sparkConnect().destroyForcibly().waitFor(2L, TimeUnit.SECONDS)) : BoxedUnit.UNIT;
            int code = this.sparkConnect().exitValue();
            IntegrationTestUtils$.MODULE$.debug("Spark Connect Server is stopped with exit code: " + code);
            n = var2_1;
        }
        catch (Throwable throwable) {
            IOException iOException;
            Throwable throwable2 = throwable;
            if (throwable2 instanceof IOException && (iOException = (IOException)throwable2).getMessage().contains("Stream closed")) {
                n = -1;
            }
            if (throwable2 != null) {
                Throwable throwable3 = throwable2;
                IntegrationTestUtils$.MODULE$.debug(throwable3);
                this.sparkConnect().destroyForcibly();
                throw throwable3;
            }
            throw throwable;
        }
        return n;
    }

    public void syncTestDependencies(SparkSession spark) {
        Path testClassesPath = Paths.get(IntegrationTestUtils$.MODULE$.connectClientTestClassDir(), new String[0]);
        spark.client().artifactManager().addClassDir(testClassesPath);
        URI[] jars = (URI[])ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps((Object[])ArrayOps$.MODULE$.filter$extension(Predef$.MODULE$.refArrayOps((Object[])StringOps$.MODULE$.split$extension(Predef$.MODULE$.augmentString(System.getProperty("java.class.path")), File.pathSeparatorChar)), (Function1 & Serializable)e -> BoxesRunTime.boxToBoolean((boolean)SparkConnectServerUtils$.$anonfun$syncTestDependencies$1(e)))), (Function1 & Serializable)e -> Paths.get(e, new String[0]).toUri(), ClassTag$.MODULE$.apply(URI.class));
        spark.client().artifactManager().addArtifacts((Seq)ArrayImplicits$.MODULE$.SparkArrayOps((Object)jars).toImmutableArraySeq());
    }

    public SparkSession createSparkSession() {
        this.start();
        RetryPolicy qual$1 = RetryPolicy$.MODULE$.defaultPolicy();
        Some x$1 = new Some((Object)BoxesRunTime.boxToInteger((int)10));
        Some x$2 = new Some((Object)FiniteDuration$.MODULE$.apply(30L, "s"));
        FiniteDuration x$3 = qual$1.copy$default$2();
        double x$4 = qual$1.copy$default$4();
        FiniteDuration x$5 = qual$1.copy$default$5();
        FiniteDuration x$6 = qual$1.copy$default$6();
        Function1 x$7 = qual$1.copy$default$7();
        String x$8 = qual$1.copy$default$8();
        boolean x$9 = qual$1.copy$default$9();
        Option x$10 = qual$1.copy$default$10();
        SparkSession spark = SparkSession$.MODULE$.builder().client(SparkConnectClient$.MODULE$.builder().userId("test").port(this.port()).retryPolicy(qual$1.copy((Option)x$1, x$3, (Option)x$2, x$4, x$5, x$6, x$7, x$8, x$9, x$10)).build()).create();
        Eventually$.MODULE$.eventually(Futures$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(1).minute()), (Function0)(JFunction0.mcV.sp & Serializable)() -> {
            String string = spark.version();
            String string2 = SparkBuildInfo$.MODULE$.spark_version();
            Predef$.MODULE$.assert(!(string != null ? !string.equals(string2) : string2 != null));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("RemoteSparkSession.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 199));
        this.syncTestDependencies(spark);
        return spark;
    }

    public static final /* synthetic */ boolean $anonfun$syncTestDependencies$1(String e) {
        String fileName = e.substring(e.lastIndexOf(File.separatorChar) + 1);
        return fileName.endsWith(".jar") && (fileName.startsWith("scalatest") || fileName.startsWith("scalactic") || fileName.startsWith("spark-catalyst") && fileName.endsWith("-tests"));
    }

    private SparkConnectServerUtils$() {
    }
}

