/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.job;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigBase;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.metadata.MetadataStore;
import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.engine.spark.job.EmptyPlaceholderJob;
import org.apache.kylin.engine.spark.job.ISparkJobHandler;
import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
import org.apache.kylin.engine.spark.job.SparkAppDescription;
import org.apache.kylin.engine.spark.merger.MetadataMerger;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.JobStoppedException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ChainedStageExecutable;
import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.execution.StageBase;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.view.LogicalView;
import org.apache.kylin.metadata.view.LogicalViewManager;
import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NSparkExecutable
extends AbstractExecutable
implements ChainedStageExecutable {
    private static final Logger logger = LoggerFactory.getLogger(NSparkExecutable.class);
    private static final String AM_EXTRA_JAVA_OPTIONS = "spark.yarn.am.extraJavaOptions";
    private static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
    private static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
    private static final String HADOOP_CONF_PATH = "./__spark_conf__/__hadoop_conf__/";
    private static final String APP_JAR_NAME = "__app__.jar";
    private static final String SPARK_JARS_1 = "spark.jars";
    private static final String SPARK_JARS_2 = "spark.yarn.dist.jars";
    private static final String SPARK_FILES_1 = "spark.files";
    private static final String SPARK_FILES_2 = "spark.yarn.dist.files";
    private static final String COMMA = ",";
    private static final String COLON = ":";
    private static final String EMPTY = "";
    private static final String SPACE = " ";
    private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
    private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
    protected static final String SPARK_MASTER = "spark.master";
    protected static final String DEPLOY_MODE = "spark.submit.deployMode";
    protected static final String CLUSTER_MODE = "cluster";
    protected static final String SPARK_PLUGINS = "spark.plugins";
    protected ISparkJobHandler sparkJobHandler;
    private final transient List<StageBase> stages = Lists.newCopyOnWriteArrayList();
    private final Map<String, List<StageBase>> stagesMap = Maps.newConcurrentMap();

    public NSparkExecutable() {
        this.initHandler();
    }

    public NSparkExecutable(Object notSetId) {
        super(notSetId);
        this.initHandler();
    }

    public String getDataflowId() {
        return this.getParam("dataflowId");
    }

    protected void initHandler() {
        this.sparkJobHandler = (ISparkJobHandler)ClassUtil.newInstance((String)KylinConfig.getInstanceFromEnv().getSparkBuildJobHandlerClassName());
    }

    public void killApplicationIfExistsOrUpdateStepStatus() {
        NDefaultScheduler scheduler = NDefaultScheduler.getInstance((String)this.getProject());
        Optional.ofNullable(scheduler.getContext()).ifPresent(context -> Optional.ofNullable(context.getRunningJobThread((Executable)this)).ifPresent(taskThread -> {
            taskThread.interrupt();
            context.removeRunningJob((Executable)this);
        }));
        this.killOrphanApplicationIfExists(this.getId());
    }

    public Set<String> getSegmentIds() {
        return Sets.newHashSet((Object[])StringUtils.split((String)this.getParam("segmentIds"), (String)COMMA));
    }

    public Set<Long> getCuboidLayoutIds() {
        return NSparkCubingUtil.str2Longs((String)this.getParam("layoutIds"));
    }

    protected void setSparkSubmitClassName(String className) {
        if (KylinConfig.getInstanceFromEnv().getSparkEngineBuildStepsToSkip().contains(((Object)((Object)this)).getClass().getName())) {
            className = EmptyPlaceholderJob.class.getName();
        }
        this.setParam("className", className);
    }

    public String getSparkSubmitClassName() {
        return this.getParam("className");
    }

    public String getJars() {
        return this.getParam("jars");
    }

    private boolean isLocalFs() {
        String fs = HadoopUtil.getWorkingFileSystem().getUri().toString();
        return fs.startsWith("file:");
    }

    private String getDistMetaFs() {
        String result;
        String defaultFs = HadoopUtil.getWorkingFileSystem().getUri().toString();
        String engineWriteFs = KylinConfig.getInstanceFromEnv().getEngineWriteFs();
        String string = result = StringUtils.isBlank((CharSequence)engineWriteFs) ? defaultFs : engineWriteFs;
        if (result.startsWith("maprfs://")) {
            return "maprfs://";
        }
        return result;
    }

    protected void setDistMetaUrl(StorageURL storageURL) {
        String fs = this.getDistMetaFs();
        HashMap stringStringHashMap = Maps.newHashMap((Map)storageURL.getAllParameters());
        if (!this.isLocalFs()) {
            stringStringHashMap.put("path", fs + storageURL.getParameter("path"));
        }
        StorageURL copy = storageURL.copy((Map)stringStringHashMap);
        this.setParam("distMetaUrl", copy.toString());
        this.setParam("outputMetaUrl", copy + "_output");
    }

    public String getDistMetaUrl() {
        return this.getParam("distMetaUrl");
    }

    public void waiteForResourceStart(ExecutableContext context) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager manager = NSparkExecutable.getExecutableManager((String)this.getProject());
            manager.updateStageStatus(this.getId() + "_00", null, ExecutableState.RUNNING, null, null);
            manager.saveUpdatedJob();
            return 0;
        }, (String)this.project, (int)3, (long)context.getEpochId(), (String)this.getTempLockName());
    }

    public ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
        String hadoopConfDir;
        File hiveConfFile;
        String sparkHome;
        this.waiteForResourceStart(context);
        this.setLogPath(this.getSparkDriverLogHdfsPath(context.getConfig()));
        KylinConfig config = this.getConfig();
        String jobId = this.getId();
        if (!config.isDevOrUT()) {
            this.setDistMetaUrl(config.getJobTmpMetaStoreUrl(this.project, jobId));
        }
        if (StringUtils.isEmpty((CharSequence)(sparkHome = KylinConfigBase.getSparkHome())) && !config.isUTEnv()) {
            throw new RuntimeException("Missing spark home");
        }
        String kylinJobJar = config.getKylinJobJarPath();
        if (StringUtils.isEmpty((CharSequence)kylinJobJar) && !config.isUTEnv()) {
            throw new RuntimeException("Missing kylin job jar");
        }
        if (!config.isUTEnv()) {
            this.sparkJobHandler.checkApplicationJar(config);
        }
        if (!(hiveConfFile = new File(hadoopConfDir = HadoopUtil.getHadoopConfDir(), "hive-site.xml")).exists() && !config.isUTEnv()) {
            throw new RuntimeException("Cannot find hive-site.xml in kylin_hadoop_conf_dir: " + hadoopConfDir + ". In order to enable spark cubing, you must set kylin.env.hadoop-conf-dir to a dir which contains at least core-site.xml, hdfs-site.xml, hive-site.xml, mapred-site.xml, yarn-site.xml");
        }
        this.deleteSnapshotDirectoryOnExists();
        this.deleteJobTmpDirectoryOnExists();
        this.onExecuteStart();
        try {
            this.attachMetadataAndKylinProps(config, this.isResumable());
        }
        catch (IOException e) {
            throw new ExecuteException("meta dump failed", (Throwable)e);
        }
        if (!this.isResumable()) {
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                NExecutableManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)this.project).setJobResumable(this.getId());
                return 0;
            }, (String)this.project, (int)3, (long)context.getEpochId(), (String)this.getTempLockName());
        }
        this.sparkJobHandler.prepareEnviroment(this.project, jobId, this.getParams());
        String argsPath = this.createArgsFileOnHDFS(config, jobId);
        this.checkParentJobStatus();
        if (config.isUTEnv()) {
            return this.runLocalMode(argsPath);
        }
        return this.runSparkSubmit(hadoopConfDir, kylinJobJar, "-className " + this.getSparkSubmitClassName() + SPACE + argsPath);
    }

    protected void onExecuteStart() throws JobStoppedException {
        this.wrapWithCheckQuit(() -> {
            Map<String, String> sparkConf = this.getSparkConf();
            HashMap jobParams = Maps.newHashMap();
            jobParams.put("job_params", JsonUtil.writeValueAsString(sparkConf));
            this.updateJobOutput(this.project, this.getId(), ExecutableState.RUNNING, jobParams, null, null);
        });
    }

    protected String createArgsFileOnHDFS(KylinConfig config, String jobId) throws ExecuteException {
        return this.sparkJobHandler.createArgsFileOnRemoteFileSystem(config, this.getProject(), jobId, this.getParams());
    }

    @VisibleForTesting
    Map<String, String> filterEmptySegments(Map<String, String> originParams) {
        HashMap copied = Maps.newHashMap(originParams);
        String originSegments = (String)copied.get("segmentIds");
        String dfId = this.getDataflowId();
        NDataflow dataFlow = NDataflowManager.getInstance((KylinConfig)this.getConfig(), (String)this.getProject()).getDataflow(dfId);
        if (Objects.isNull(dataFlow) || StringUtils.isBlank((CharSequence)originSegments)) {
            return copied;
        }
        String newSegments = Stream.of(StringUtils.split((String)originSegments, (String)COMMA)).filter(id -> Objects.nonNull(dataFlow.getSegment(id))).collect(Collectors.joining(COMMA));
        copied.put("segmentIds", newSegments);
        return copied;
    }

    public String getSparkDriverLogHdfsPath(KylinConfig config) {
        return String.format(Locale.ROOT, "%s.%s.log", config.getJobTmpOutputStorePath(this.getProject(), this.getId()), System.currentTimeMillis());
    }

    private Boolean checkHadoopWorkingDir() {
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        String hdfsWorkingDirectory = kylinConfig.getHdfsWorkingDirectory();
        Properties properties = KylinConfig.buildSiteProperties();
        String hdfsWorkingDirectoryFromProperties = kylinConfig.getHdfsWorkingDirectoryFromProperties(properties);
        return StringUtils.equals((CharSequence)hdfsWorkingDirectory, (CharSequence)hdfsWorkingDirectoryFromProperties);
    }

    protected KylinConfig getConfig() {
        String yarnQueue;
        KylinConfig originalConfig = KylinConfig.getInstanceFromEnv();
        if (!originalConfig.isDevOrUT() && !this.checkHadoopWorkingDir().booleanValue()) {
            KylinConfig.getInstanceFromEnv().reloadKylinConfigPropertiesFromSiteProperties();
        }
        String project = this.getProject();
        Preconditions.checkState((boolean)StringUtils.isNotBlank((CharSequence)project), (Object)("job " + this.getId() + " project info is empty"));
        KylinConfigExt kylinConfigExt = this.getKylinConfigExt(originalConfig, project);
        HashMap jobOverrides = Maps.newHashMap();
        String parentId = this.getParentId();
        jobOverrides.put("job.id", StringUtils.defaultIfBlank((CharSequence)parentId, (CharSequence)this.getId()));
        jobOverrides.put("job.project", project);
        if (StringUtils.isNotBlank((CharSequence)originalConfig.getMountSparkLogDir())) {
            jobOverrides.put("job.mountDir", originalConfig.getMountSparkLogDir());
        }
        if (StringUtils.isNotBlank((CharSequence)parentId)) {
            jobOverrides.put("job.stepId", this.getId());
        }
        jobOverrides.put("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
        jobOverrides.put("spark.driver.log4j.appender.hdfs.File", Objects.isNull(this.getLogPath()) ? "null" : this.getLogPath());
        jobOverrides.putAll(kylinConfigExt.getExtendedOverrides());
        if (this.getParent() != null && !StringUtils.isEmpty((CharSequence)(yarnQueue = this.getParent().getSparkYarnQueue()))) {
            jobOverrides.put("kylin.engine.spark-conf.spark.yarn.queue", yarnQueue);
        }
        return KylinConfigExt.createInstance((KylinConfig)kylinConfigExt, (Map)jobOverrides);
    }

    public KylinConfigExt getKylinConfigExt(KylinConfig originalConfig, String project) {
        NDataflowManager dataflowManager;
        NDataflow dataflow;
        String dataflowId = this.getParam("dataflowId");
        if (StringUtils.isNotBlank((CharSequence)dataflowId) && null != (dataflow = (dataflowManager = NDataflowManager.getInstance((KylinConfig)originalConfig, (String)project)).getDataflow(dataflowId))) {
            return dataflow.getConfig();
        }
        ProjectInstance projectInstance = NProjectManager.getInstance((KylinConfig)originalConfig).getProject(project);
        return projectInstance.getConfig();
    }

    public SparkAppDescription getSparkAppDesc() {
        SparkAppDescription desc = new SparkAppDescription();
        KylinConfig conf = this.getConfig();
        desc.setJobNamePrefix(this.getJobNamePrefix());
        desc.setProject(this.getProject());
        desc.setJobId(this.getId());
        desc.setStepId(this.getStepId());
        desc.setSparkSubmitClassName(this.getSparkSubmitClassName());
        Map<String, String> sparkConf = this.getSparkConf(conf);
        desc.setSparkConf(sparkConf);
        desc.setComma(COMMA);
        desc.setSparkJars(this.getSparkJars(conf, sparkConf));
        desc.setSparkFiles(this.getSparkFiles(conf, sparkConf));
        return desc;
    }

    protected ExecuteResult runSparkSubmit(String hadoopConfDir, String kylinJobJar, String appArgs) throws JobStoppedException {
        this.sparkJobHandler.killOrphanApplicationIfExists(this.project, this.getId(), this.getConfig(), Boolean.valueOf(true), this.getSparkConf());
        try {
            SparkAppDescription desc = this.getSparkAppDesc();
            desc.setHadoopConfDir(hadoopConfDir);
            desc.setKylinJobJar(kylinJobJar);
            desc.setAppArgs(appArgs);
            Object cmd = this.sparkJobHandler.generateSparkCmd(KylinConfig.getInstanceFromEnv(), desc);
            Map updateInfo = this.sparkJobHandler.runSparkSubmit(cmd, this.getParentId());
            String output = (String)updateInfo.get("output");
            if (StringUtils.isNotEmpty((CharSequence)((CharSequence)updateInfo.get("process_id")))) {
                try {
                    updateInfo.remove("output");
                    EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                        NExecutableManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)this.project).updateJobOutput(this.getParentId(), this.getStatus(), updateInfo, null, null);
                        return null;
                    }, (String)this.project, (int)3, (long)this.getEpochId(), (String)this.getTempLockName());
                }
                catch (Exception e) {
                    logger.warn("failed to record process id.");
                }
            }
            return ExecuteResult.createSucceed((String)output);
        }
        catch (Exception e) {
            this.checkNeedQuit(true);
            logger.warn("failed to execute spark submit command.");
            this.wrapWithExecuteExceptionUpdateJobError(e);
            return ExecuteResult.createError((Throwable)e);
        }
    }

    public void killOrphanApplicationIfExists(String jobStepId) {
        this.sparkJobHandler.killOrphanApplicationIfExists(this.project, jobStepId, this.getConfig(), Boolean.valueOf(false), this.getSparkConf());
    }

    protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
        Map confMap = config.getSparkConfigOverride();
        String driverMemConf = "spark.driver.memory";
        if (!confMap.containsKey("spark.driver.memory")) {
            confMap.put("spark.driver.memory", this.computeStepDriverMemory() + "m");
        }
        if (UserGroupInformation.isSecurityEnabled()) {
            confMap.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
        }
        return confMap;
    }

    private ExecuteResult runLocalMode(String appArgs) {
        try {
            Class appClz = ClassUtil.forName((String)this.getSparkSubmitClassName(), Object.class);
            appClz.getMethod("main", String[].class).invoke(appClz.newInstance(), new Object[]{new String[]{appArgs}});
            return ExecuteResult.createSucceed();
        }
        catch (Exception e) {
            return ExecuteResult.createError((Throwable)e);
        }
    }

    protected Set<String> getMetadataDumpList(KylinConfig config) {
        return Collections.emptySet();
    }

    protected Set<String> getLogicalViewMetaDumpList(KylinConfig config) {
        LogicalView logicalView;
        LinkedHashSet<String> dumpList = new LinkedHashSet<String>();
        if (!config.isDDLLogicalViewEnabled()) {
            return dumpList;
        }
        String table = this.getParam("table");
        String dataflowId = this.getDataflowId();
        LogicalViewManager viewManager = LogicalViewManager.getInstance((KylinConfig)config);
        if (StringUtils.isNotBlank((CharSequence)dataflowId)) {
            Set viewsMeta = viewManager.findLogicalViewsInModel(this.getProject(), this.getDataflowId()).stream().map(LogicalView::getResourcePath).collect(Collectors.toSet());
            dumpList.addAll(viewsMeta);
        }
        if (StringUtils.isNotBlank((CharSequence)table) && (logicalView = viewManager.findLogicalViewInProject(this.getProject(), table)) != null) {
            dumpList.add(logicalView.getResourcePath());
        }
        return dumpList;
    }

    void attachMetadataAndKylinProps(KylinConfig config) throws IOException {
        this.attachMetadataAndKylinProps(config, false);
    }

    protected void attachMetadataAndKylinProps(KylinConfig config, boolean kylinPropsOnly) throws IOException {
        String metaDumpUrl = this.getDistMetaUrl();
        if (StringUtils.isEmpty((CharSequence)metaDumpUrl)) {
            throw new RuntimeException("Missing metaUrl");
        }
        File tmpDir = File.createTempFile("kylin_job_meta", EMPTY);
        FileUtils.forceDelete((File)tmpDir);
        Properties props = config.exportToProperties();
        props.setProperty("kylin.metadata.url", metaDumpUrl);
        this.modifyDump(props);
        if (kylinPropsOnly) {
            ResourceStore.dumpKylinProps((File)tmpDir, (Properties)props);
        } else {
            Map dumpMap = (Map)EnhancedUnitOfWork.doInTransactionWithCheckAndRetry((UnitOfWorkParams)UnitOfWorkParams.builder().readonly(true).unitName(this.getProject()).maxRetry(1).processor(() -> {
                HashMap retMap = Maps.newHashMap();
                for (String resPath : this.getMetadataDumpList(config)) {
                    ResourceStore resourceStore = ResourceStore.getKylinMetaStore((KylinConfig)config);
                    RawResource rawResource = resourceStore.getResource(resPath);
                    retMap.put(resPath, rawResource);
                }
                return retMap;
            }).build());
            if (Objects.isNull(dumpMap) || dumpMap.isEmpty()) {
                return;
            }
            ResourceStore.dumpResourceMaps((File)tmpDir, (Map)dumpMap, (Properties)props);
        }
        Properties propsForMetaStore = config.exportToProperties();
        propsForMetaStore.setProperty("kylin.metadata.url", metaDumpUrl);
        KylinConfig dstConfig = KylinConfig.createKylinConfig((Properties)propsForMetaStore);
        MetadataStore.createMetadataStore((KylinConfig)dstConfig).uploadFromFile(tmpDir);
        logger.debug("Copied metadata to the target metaUrl, delete the temp dir: {}", (Object)tmpDir);
        FileUtils.forceDelete((File)tmpDir);
    }

    private void modifyDump(Properties props) {
        this.sparkJobHandler.modifyDump(props);
        this.removeUnNecessaryDump(props);
    }

    private void removeUnNecessaryDump(Properties props) {
        props.remove("kylin.engine.spark-conf.spark.jars");
        props.remove("kylin.engine.spark-conf.spark.yarn.dist.jars");
        props.remove("kylin.engine.spark-conf.spark.files");
        props.remove("kylin.engine.spark-conf.spark.yarn.dist.files");
        props.remove("kylin.engine.spark-conf.spark.driver.extraJavaOptions");
        props.remove("kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions");
        props.remove("kylin.engine.spark-conf.spark.executor.extraJavaOptions");
        props.remove("kylin.engine.spark-conf.spark.driver.extraClassPath");
        props.remove("kylin.engine.spark-conf.spark.executor.extraClassPath");
        props.remove("kylin.query.async-query.spark-conf.spark.yarn.am.extraJavaOptions");
        props.remove("kylin.query.async-query.spark-conf.spark.executor.extraJavaOptions");
        props.remove("kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions");
        props.remove("kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions");
    }

    private void deleteSnapshotDirectoryOnExists() {
        if (this.isResumable()) {
            return;
        }
        KylinConfig kylinConf = KylinConfig.getInstanceFromEnv();
        String snapshotPath = kylinConf.getSnapshotCheckPointDir(this.getProject(), this.getId().split("_")[0]);
        try {
            Path path = new Path(snapshotPath);
            HadoopUtil.deletePath((Configuration)HadoopUtil.getCurrentConfiguration(), (Path)path);
        }
        catch (Exception e) {
            logger.error("delete snapshot checkpoint in path {} failed.", (Object)snapshotPath, (Object)e);
        }
    }

    private void deleteJobTmpDirectoryOnExists() {
        if (!this.getConfig().isDeleteJobTmpWhenRetry()) {
            return;
        }
        if (this.isResumable()) {
            return;
        }
        StorageURL storageURL = StorageURL.valueOf((String)this.getDistMetaUrl());
        String metaPath = storageURL.getParameter("path");
        String[] directories = metaPath.split("/");
        String lastDirectory = directories[directories.length - 1];
        String taskPath = metaPath.substring(0, metaPath.length() - 1 - lastDirectory.length());
        try {
            Path path = new Path(taskPath);
            HadoopUtil.deletePath((Configuration)HadoopUtil.getCurrentConfiguration(), (Path)path);
        }
        catch (Exception e) {
            logger.error("delete job tmp in path {} failed.", (Object)taskPath, (Object)e);
        }
    }

    protected String getJobNamePrefix() {
        return "job_step_";
    }

    protected String getExtJar() {
        return EMPTY;
    }

    public boolean needMergeMetadata() {
        return false;
    }

    public void mergerMetadata(MetadataMerger merger) {
        throw new UnsupportedOperationException();
    }

    public AbstractExecutable addStage(AbstractExecutable step) {
        int stepId = this.stages.size();
        step.setId(this.getId() + "_" + String.format(Locale.ROOT, "%02d", stepId));
        step.setParent((AbstractExecutable)this);
        step.setStepId(stepId);
        this.stages.add((StageBase)step);
        return step;
    }

    public void setStageMap() {
        if (CollectionUtils.isEmpty(this.stages)) {
            return;
        }
        if (StringUtils.isBlank((CharSequence)this.getParam("segmentIds"))) {
            this.stagesMap.put(this.getId(), this.stages);
            return;
        }
        for (String segmentId : this.getSegmentIds()) {
            this.stagesMap.put(segmentId, this.stages);
        }
        if (StringUtils.isNotBlank((CharSequence)this.getParam("layoutIds"))) {
            int indexCount = StringUtil.splitAndTrim((String)this.getParam("layoutIds"), (String)COMMA).length;
            this.setParam("indexCount", String.valueOf(indexCount));
        }
    }

    public void setStageMapWithSegment(String id, List<StageBase> steps) {
        List<StageBase> old = this.stagesMap.getOrDefault(id, Lists.newCopyOnWriteArrayList());
        old.addAll(steps);
        this.stagesMap.put(id, steps);
    }

    public Map<String, List<StageBase>> getStagesMap() {
        return this.stagesMap;
    }

    private boolean isClusterMode(Map<String, String> sparkConf) {
        return CLUSTER_MODE.equals(sparkConf.get(DEPLOY_MODE));
    }

    private Map<String, String> getSparkConf() {
        return this.getSparkConf(this.getConfig());
    }

    private Map<String, String> getSparkConf(KylinConfig kylinConf) {
        KapConfig kapConf = KapConfig.wrap((KylinConfig)kylinConf);
        Map<String, String> sparkConf = this.getSparkConfigOverride(kylinConf);
        this.rewriteKerberosConf(kapConf, sparkConf);
        this.rewriteDriverExtraJavaOptions(kylinConf, kapConf, sparkConf);
        this.rewriteExecutorExtraJavaOptions(kylinConf, sparkConf);
        this.rewritePluginOptions(kylinConf, sparkConf);
        this.rewriteExtraClasspath(kylinConf, sparkConf);
        return Collections.unmodifiableMap(sparkConf);
    }

    private void rewriteDriverExtraJavaOptions(KylinConfig kylinConf, KapConfig kapConf, Map<String, String> sparkConf) {
        String logLocalWorkingDirectory;
        Map extendedOverrides;
        StringBuilder sb = new StringBuilder();
        if (sparkConf.containsKey(DRIVER_EXTRA_JAVA_OPTIONS)) {
            sb.append(sparkConf.get(DRIVER_EXTRA_JAVA_OPTIONS));
        }
        String hdfsWorkingDir = kylinConf.getHdfsWorkingDirectory();
        String sparkDriverHdfsLogPath = null;
        if (kylinConf instanceof KylinConfigExt && Objects.nonNull(extendedOverrides = ((KylinConfigExt)kylinConf).getExtendedOverrides())) {
            sparkDriverHdfsLogPath = (String)extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
        }
        if (kapConf.isCloud() && StringUtils.isNotBlank((CharSequence)(logLocalWorkingDirectory = kylinConf.getLogLocalWorkingDirectory()))) {
            hdfsWorkingDir = logLocalWorkingDirectory;
            sparkDriverHdfsLogPath = logLocalWorkingDirectory + sparkDriverHdfsLogPath;
        }
        sb.append(SPACE).append("-Dkylin.hdfs.working.dir=").append(hdfsWorkingDir);
        sb.append(SPACE).append("-Dspark.driver.log4j.appender.hdfs.File=").append(sparkDriverHdfsLogPath);
        this.rewriteDriverLog4jConf(sb, kylinConf, sparkConf);
        sb.append(SPACE).append("-Dspark.driver.rest.server.address=").append(kylinConf.getServerAddress());
        sb.append(SPACE).append("-Dspark.driver.param.taskId=").append(this.getId());
        sb.append(SPACE).append("-Dspark.driver.local.logDir=").append(KapConfig.getKylinLogDirAtBestEffort()).append("/spark");
        if (kapConf.getPlatformZKEnable()) {
            sb.append(SPACE).append("-Djava.security.auth.login.config=").append(kapConf.getKerberosJaasConfPath());
        }
        if (kylinConf.buildJobProfilingEnabled()) {
            sb.append(SPACE).append("-Dspark.profiler.flagsDir=").append(kylinConf.getJobTmpProfilerFlagsDir(this.project, this.getId()));
            sb.append(SPACE).append("-Dspark.profiler.collection.timeout=").append(kylinConf.buildJobProfilingResultTimeout());
            sb.append(SPACE).append("-Dspark.profiler.profiling.timeout=").append(kylinConf.buildJobProfilingProfileTimeout());
        }
        sparkConf.put(DRIVER_EXTRA_JAVA_OPTIONS, sb.toString().trim());
    }

    @VisibleForTesting
    public String getDriverExtraJavaOptions(KylinConfig kylinConf) {
        KapConfig kapConf = KapConfig.wrap((KylinConfig)kylinConf);
        Map<String, String> sparkConf = this.getSparkConfigOverride(kylinConf);
        this.rewriteDriverExtraJavaOptions(kylinConf, kapConf, sparkConf);
        return sparkConf.get(DRIVER_EXTRA_JAVA_OPTIONS);
    }

    private void rewriteKerberosConf(KapConfig kapConf, final Map<String, String> sparkConf) {
        if (Boolean.FALSE.equals(kapConf.isKerberosEnabled())) {
            return;
        }
        sparkConf.put("spark.kerberos.principal", kapConf.getKerberosPrincipal());
        sparkConf.put("spark.kerberos.keytab", kapConf.getKerberosKeytabPath());
        String remoteKrb5 = HADOOP_CONF_PATH + kapConf.getKerberosKrb5Conf();
        ConfMap confMap = new ConfMap(){

            @Override
            public String get(String key) {
                return (String)sparkConf.get(key);
            }

            @Override
            public void set(String key, String value) {
                sparkConf.put(key, value);
            }
        };
        if (this.isClusterMode(sparkConf)) {
            this.rewriteSpecifiedKrb5Conf(DRIVER_EXTRA_JAVA_OPTIONS, remoteKrb5, confMap);
        } else {
            this.rewriteSpecifiedKrb5Conf(DRIVER_EXTRA_JAVA_OPTIONS, kapConf.getKerberosKrb5ConfPath(), confMap);
        }
        this.rewriteSpecifiedKrb5Conf(AM_EXTRA_JAVA_OPTIONS, remoteKrb5, confMap);
        this.rewriteSpecifiedKrb5Conf(EXECUTOR_EXTRA_JAVA_OPTIONS, remoteKrb5, confMap);
    }

    private void rewriteExecutorExtraJavaOptions(KylinConfig kylinConf, Map<String, String> sparkConf) {
        StringBuilder sb = new StringBuilder();
        if (sparkConf.containsKey(EXECUTOR_EXTRA_JAVA_OPTIONS)) {
            sb.append(sparkConf.get(EXECUTOR_EXTRA_JAVA_OPTIONS));
        }
        sb.append(SPACE).append("-Dkylin.dictionary.globalV2-store-class-name=").append(kylinConf.getGlobalDictV2StoreImpl());
        sparkConf.put(EXECUTOR_EXTRA_JAVA_OPTIONS, sb.toString().trim());
    }

    private void rewriteSpecifiedKrb5Conf(String key, String value, ConfMap confMap) {
        String originOptions = confMap.get(key);
        if (Objects.isNull(originOptions)) {
            originOptions = EMPTY;
        }
        if (originOptions.contains("-Djava.security.krb5.conf")) {
            return;
        }
        String newOptions = "-Djava.security.krb5.conf=" + value + SPACE + originOptions;
        confMap.set(key, newOptions.trim());
    }

    private void rewritePluginOptions(KylinConfig kylinConf, Map<String, String> sparkConf) {
        if (kylinConf.buildJobProfilingEnabled()) {
            sparkConf.computeIfPresent(SPARK_PLUGINS, (pluginKey, pluginValue) -> pluginValue + COMMA + BuildAsyncProfilerSparkPlugin.class.getCanonicalName());
            sparkConf.computeIfAbsent(SPARK_PLUGINS, pluginKey -> BuildAsyncProfilerSparkPlugin.class.getCanonicalName());
        }
    }

    private void rewriteExtraClasspath(KylinConfig kylinConf, Map<String, String> sparkConf) {
        if (this.isClusterMode(sparkConf)) {
            LinkedHashSet sparkJars = Sets.newLinkedHashSet();
            sparkJars.add(APP_JAR_NAME);
            sparkJars.addAll(this.getSparkJars(kylinConf, sparkConf));
            String jointJarNames = String.join((CharSequence)COLON, sparkJars.stream().map(jar -> Paths.get(jar, new String[0]).getFileName().toString()).collect(Collectors.toSet()));
            sparkConf.put(DRIVER_EXTRA_CLASSPATH, jointJarNames);
            sparkConf.put(EXECUTOR_EXTRA_CLASSPATH, jointJarNames);
            return;
        }
        Set<String> sparkJars = this.getSparkJars(kylinConf, sparkConf);
        sparkConf.put(DRIVER_EXTRA_CLASSPATH, String.join((CharSequence)COLON, sparkJars));
        sparkConf.put(EXECUTOR_EXTRA_CLASSPATH, String.join((CharSequence)COLON, sparkJars.stream().map(jar -> Paths.get(jar, new String[0]).getFileName().toString()).collect(Collectors.toSet())));
    }

    private void rewriteDriverLog4jConf(StringBuilder sb, KylinConfig config, Map<String, String> sparkConf) {
        String localLog4j = config.getLogSparkDriverPropertiesFile();
        String remoteLog4j = Paths.get(localLog4j, new String[0]).getFileName().toString();
        if (this.isClusterMode(sparkConf) || config.getSparkMaster().startsWith("k8s")) {
            sb.append(SPACE).append("-Dlog4j.configurationFile=").append(remoteLog4j);
        } else {
            sb.append(SPACE).append("-Dlog4j.configurationFile=file:").append(localLog4j);
        }
    }

    private Set<String> getSparkJars(KylinConfig kylinConf, Map<String, String> sparkConf) {
        LinkedHashSet jarPaths = Sets.newLinkedHashSet();
        jarPaths.add(kylinConf.getKylinJobJarPath());
        jarPaths.add(kylinConf.getExtraJarsPath());
        jarPaths.add(this.getJars());
        jarPaths.add(this.getExtJar());
        jarPaths.add(sparkConf.get(SPARK_JARS_1));
        jarPaths.add(sparkConf.get(SPARK_JARS_2));
        LinkedHashSet sparkJars = jarPaths.stream().filter(StringUtils::isNotEmpty).flatMap(p -> Arrays.stream(StringUtils.split((String)p, (String)COMMA))).filter(jar -> jar.endsWith(".jar")).collect(Collectors.toCollection(LinkedHashSet::new));
        return Collections.unmodifiableSet(sparkJars);
    }

    private Set<String> getSparkFiles(KylinConfig kylinConf, Map<String, String> sparkConf) {
        LinkedHashSet filePaths = Sets.newLinkedHashSet();
        filePaths.add(kylinConf.getLogSparkAppMasterPropertiesFile());
        filePaths.add(kylinConf.getLogSparkDriverPropertiesFile());
        filePaths.add(kylinConf.getLogSparkExecutorPropertiesFile());
        if (kylinConf.buildJobProfilingEnabled()) {
            try {
                filePaths.add(kylinConf.getAsyncProfilerFiles());
            }
            catch (IOException e) {
                logger.error("Add SparkPluginFile failed.", (Throwable)e);
            }
        }
        filePaths.add(sparkConf.get(SPARK_FILES_1));
        filePaths.add(sparkConf.get(SPARK_FILES_2));
        LinkedHashSet sparkFiles = filePaths.stream().filter(StringUtils::isNotEmpty).flatMap(p -> Arrays.stream(StringUtils.split((String)p, (String)COMMA))).filter(StringUtils::isNotEmpty).collect(Collectors.toCollection(LinkedHashSet::new));
        return Collections.unmodifiableSet(sparkFiles);
    }

    public void cancelJob() {
        this.killOrphanApplicationIfExists(this.getId());
    }

    private static interface ConfMap {
        public String get(String var1);

        public void set(String var1, String var2);
    }
}

