/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.interpreter.launcher;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext;
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
import org.apache.zeppelin.util.Util;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkInterpreterLauncherTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
    private String sparkHome;
    private String zeppelinHome;

    @BeforeEach
    public void setUp() {
        for (ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
            System.clearProperty(confVar.getVarName());
        }
        this.sparkHome = DownloadUtils.downloadSpark("3.4.1", "3");
        System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("..").getAbsolutePath());
        this.zeppelinHome = ZeppelinConfiguration.create().getZeppelinHome();
        LOGGER.info("ZEPPELIN_HOME: " + this.zeppelinHome);
    }

    @Test
    public void testConnectTimeOut() throws IOException {
        ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
        SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
        InterpreterOption option = new InterpreterOption();
        option.setUserImpersonate(true);
        InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
        InterpreterClient client = launcher.launch(context);
        Assertions.assertTrue((boolean)(client instanceof ExecRemoteInterpreterProcess));
        try (ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess)client;){
            Assertions.assertEquals((Object)"name", (Object)interpreterProcess.getInterpreterSettingName());
            Assertions.assertEquals((Object)(this.zeppelinHome + "/interpreter/groupName"), (Object)interpreterProcess.getInterpreterDir());
            Assertions.assertEquals((Object)(this.zeppelinHome + "/local-repo/groupId"), (Object)interpreterProcess.getLocalRepoDir());
            Assertions.assertEquals((int)10000, (int)interpreterProcess.getConnectTimeout());
            Assertions.assertEquals((Object)zConf.getInterpreterRemoteRunnerPath(), (Object)interpreterProcess.getInterpreterRunner());
            Assertions.assertTrue((interpreterProcess.getEnv().size() >= 2 ? 1 : 0) != 0);
            Assertions.assertEquals((Object)true, (Object)interpreterProcess.isUserImpersonated());
        }
    }

    @Test
    public void testLocalMode() throws IOException {
        ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
        SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("ENV_1", "");
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "local[*]");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption option = new InterpreterOption();
        InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        InterpreterClient client = launcher.launch(context);
        Assertions.assertTrue((boolean)(client instanceof ExecRemoteInterpreterProcess));
        try (ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess)client;){
            Assertions.assertEquals((Object)"spark", (Object)interpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue((boolean)interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue((boolean)interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals((Object)zConf.getInterpreterRemoteRunnerPath(), (Object)interpreterProcess.getInterpreterRunner());
            Assertions.assertTrue((interpreterProcess.getEnv().size() >= 2 ? 1 : 0) != 0);
            Assertions.assertEquals((Object)this.sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertFalse((boolean)interpreterProcess.getEnv().containsKey("ENV_1"));
            String expected = "--conf|spark.files=file_1|--conf|spark.jars=jar_1|--conf|spark.app.name=intpGroupId|--conf|spark.master=local[*]";
            Assertions.assertTrue((boolean)CollectionUtils.isEqualCollection(Arrays.asList(expected.split("\\|")), Arrays.asList(((String)interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
        }
    }

    @Test
    public void testYarnClientMode_1() throws IOException {
        ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
        SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn-client");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption option = new InterpreterOption();
        InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        InterpreterClient client = launcher.launch(context);
        Assertions.assertTrue((boolean)(client instanceof ExecRemoteInterpreterProcess));
        try (ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess)client;){
            Assertions.assertEquals((Object)"spark", (Object)interpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue((boolean)interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue((boolean)interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals((Object)zConf.getInterpreterRemoteRunnerPath(), (Object)interpreterProcess.getInterpreterRunner());
            Assertions.assertTrue((interpreterProcess.getEnv().size() >= 2 ? 1 : 0) != 0);
            Assertions.assertEquals((Object)this.sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
            String sparkJars = "jar_1";
            String sparkrZip = this.sparkHome + "/R/lib/sparkr.zip#sparkr";
            String sparkFiles = "file_1";
            String expected = "--conf|spark.yarn.dist.archives=" + sparkrZip + "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn-client";
            Assertions.assertTrue((boolean)CollectionUtils.isEqualCollection(Arrays.asList(expected.split("\\|")), Arrays.asList(((String)interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
        }
    }

    @Test
    public void testYarnClientMode_2() throws IOException {
        ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
        SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn");
        properties.setProperty("spark.submit.deployMode", "client");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption option = new InterpreterOption();
        InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        InterpreterClient client = launcher.launch(context);
        Assertions.assertTrue((boolean)(client instanceof ExecRemoteInterpreterProcess));
        try (ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess)client;){
            Assertions.assertEquals((Object)"spark", (Object)interpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue((boolean)interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue((boolean)interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals((Object)zConf.getInterpreterRemoteRunnerPath(), (Object)interpreterProcess.getInterpreterRunner());
            Assertions.assertTrue((interpreterProcess.getEnv().size() >= 2 ? 1 : 0) != 0);
            Assertions.assertEquals((Object)this.sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
            String sparkJars = "jar_1";
            String sparkrZip = this.sparkHome + "/R/lib/sparkr.zip#sparkr";
            String sparkFiles = "file_1";
            String expected = "--conf|spark.yarn.dist.archives=" + sparkrZip + "|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + "|--conf|spark.submit.deployMode=client|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn";
            Assertions.assertTrue((boolean)CollectionUtils.isEqualCollection(Arrays.asList(expected.split("\\|")), Arrays.asList(((String)interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
        }
    }

    @Test
    public void testYarnClusterMode_1() throws IOException {
        ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
        SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn-cluster");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption option = new InterpreterOption();
        InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        InterpreterClient client = launcher.launch(context);
        Assertions.assertTrue((boolean)(client instanceof ExecRemoteInterpreterProcess));
        try (ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess)client;){
            Assertions.assertEquals((Object)"spark", (Object)interpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue((boolean)interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue((boolean)interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals((Object)zConf.getInterpreterRemoteRunnerPath(), (Object)interpreterProcess.getInterpreterRunner());
            Assertions.assertTrue((interpreterProcess.getEnv().size() >= 3 ? 1 : 0) != 0);
            Assertions.assertEquals((Object)this.sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertEquals((Object)"true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
            String sparkJars = "jar_1," + this.zeppelinHome + "/interpreter/spark/scala-2.12/spark-scala-2.12-" + Util.getVersion() + ".jar," + this.zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
            String sparkrZip = this.sparkHome + "/R/lib/sparkr.zip#sparkr";
            String sparkFiles = "file_1," + this.zeppelinHome + "/conf/log4j_yarn_cluster.properties";
            String expected = "--conf|spark.yarn.dist.archives=" + sparkrZip + "|--conf|spark.yarn.maxAppAttempts=1|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + "|--conf|spark.yarn.isPython=true|--conf|spark.yarn.submit.waitAppCompletion=false|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn-cluster";
            Assertions.assertTrue((boolean)CollectionUtils.isEqualCollection(Arrays.asList(expected.split("\\|")), Arrays.asList(((String)interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
        }
    }

    @Test
    public void testYarnClusterMode_2() throws IOException {
        ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
        SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn");
        properties.setProperty("spark.submit.deployMode", "cluster");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption option = new InterpreterOption();
        option.setUserImpersonate(true);
        InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId());
        FileUtils.deleteDirectory((File)localRepoPath.toFile());
        Files.createDirectories(localRepoPath, new FileAttribute[0]);
        Files.createFile(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar"), new FileAttribute[0]);
        InterpreterClient client = launcher.launch(context);
        Assertions.assertTrue((boolean)(client instanceof ExecRemoteInterpreterProcess));
        try (ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess)client;){
            Assertions.assertEquals((Object)"spark", (Object)interpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue((boolean)interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue((boolean)interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals((Object)zConf.getInterpreterRemoteRunnerPath(), (Object)interpreterProcess.getInterpreterRunner());
            Assertions.assertTrue((interpreterProcess.getEnv().size() >= 3 ? 1 : 0) != 0);
            Assertions.assertEquals((Object)this.sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertEquals((Object)"true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
            String sparkJars = "jar_1," + Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar").toString() + "," + this.zeppelinHome + "/interpreter/spark/scala-2.12/spark-scala-2.12-" + Util.getVersion() + ".jar," + this.zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
            String sparkrZip = this.sparkHome + "/R/lib/sparkr.zip#sparkr";
            String sparkFiles = "file_1," + this.zeppelinHome + "/conf/log4j_yarn_cluster.properties";
            String expected = "--proxy-user|user1|--conf|spark.yarn.dist.archives=" + sparkrZip + "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.yarn.maxAppAttempts=1|--conf|spark.master=yarn|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + "|--conf|spark.submit.deployMode=cluster|--conf|spark.yarn.submit.waitAppCompletion=false";
            Assertions.assertTrue((boolean)CollectionUtils.isEqualCollection(Arrays.asList(expected.split("\\|")), Arrays.asList(((String)interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
            Assertions.assertTrue((boolean)((String)interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).startsWith("--proxy-user|user1"));
        }
        Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar"));
        FileUtils.deleteDirectory((File)localRepoPath.toFile());
    }

    @Test
    public void testYarnClusterMode_3() throws IOException {
        ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
        SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn");
        properties.setProperty("spark.submit.deployMode", "cluster");
        properties.setProperty("spark.files", "{}");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption option = new InterpreterOption();
        option.setUserImpersonate(true);
        InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId());
        FileUtils.deleteDirectory((File)localRepoPath.toFile());
        Files.createDirectories(localRepoPath, new FileAttribute[0]);
        InterpreterClient client = launcher.launch(context);
        Assertions.assertTrue((boolean)(client instanceof ExecRemoteInterpreterProcess));
        try (ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess)client;){
            Assertions.assertEquals((Object)"spark", (Object)interpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue((boolean)interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue((boolean)interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals((Object)zConf.getInterpreterRemoteRunnerPath(), (Object)interpreterProcess.getInterpreterRunner());
            Assertions.assertTrue((interpreterProcess.getEnv().size() >= 3 ? 1 : 0) != 0);
            Assertions.assertEquals((Object)this.sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertEquals((Object)"true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
            String sparkJars = "jar_1," + this.zeppelinHome + "/interpreter/spark/scala-2.12/spark-scala-2.12-" + Util.getVersion() + ".jar," + this.zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
            String sparkrZip = this.sparkHome + "/R/lib/sparkr.zip#sparkr";
            String sparkFiles = "{}," + this.zeppelinHome + "/conf/log4j_yarn_cluster.properties";
            String expected = "--proxy-user|user1|--conf|spark.yarn.dist.archives=" + sparkrZip + "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.yarn.maxAppAttempts=1|--conf|spark.master=yarn|--conf|spark.files=" + sparkFiles + "|--conf|spark.jars=" + sparkJars + "|--conf|spark.submit.deployMode=cluster|--conf|spark.yarn.submit.waitAppCompletion=false";
            Assertions.assertTrue((boolean)CollectionUtils.isEqualCollection(Arrays.asList(expected.split("\\|")), Arrays.asList(((String)interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
            Assertions.assertTrue((boolean)((String)interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).startsWith("--proxy-user|user1"));
        }
        FileUtils.deleteDirectory((File)localRepoPath.toFile());
    }
}

