/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.application;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.spark.common.util.TimeZoneUtils;
import org.apache.kylin.engine.spark.job.BuildJobInfos;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
import org.apache.kylin.engine.spark.job.UdfManager;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.StandaloneAppClient;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
import org.apache.spark.util.Utils;
import org.apache.spark.utils.ResourceUtils;
import org.apache.spark.utils.SparkVersionUtils;
import org.apache.spark.utils.YarnInfoFetcherUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SparkApplication {
    private static final Logger logger = LoggerFactory.getLogger(SparkApplication.class);
    private Map<String, String> params = Maps.newHashMap();
    protected volatile KylinConfig config;
    protected volatile String jobId;
    protected SparkSession ss;
    protected String project;
    protected int layoutSize = -1;
    protected BuildJobInfos infos;

    public void execute(String[] args) {
        Path path = new Path(args[0]);
        FileSystem fs = HadoopUtil.getFileSystem((Path)path);
        try (FSDataInputStream inputStream = fs.open(path);
             InputStreamReader inputStreamReader = new InputStreamReader((InputStream)inputStream, StandardCharsets.UTF_8);){
            BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
            String argsLine = bufferedReader.readLine();
            if (argsLine.isEmpty()) {
                throw new RuntimeException("Args file is empty");
            }
            this.params = JsonUtil.readValueAsMap((String)argsLine);
            this.checkArgs();
            logger.info("Executor task " + this.getClass().getName() + " with args : " + argsLine);
            this.execute();
        }
        catch (Exception e) {
            logger.error("The spark job execute failed!", (Throwable)e);
            throw new RuntimeException("Error execute " + this.getClass().getName(), e);
        }
    }

    public final Map<String, String> getParams() {
        return this.params;
    }

    public final String getParam(String key) {
        return this.params.get(key);
    }

    public final void setParam(String key, String value) {
        this.params.put(key, value);
    }

    public final boolean contains(String key) {
        return this.params.containsKey(key);
    }

    public void checkArgs() {
    }

    public String getTrackingUrl(String yarnAppId) throws IOException, YarnException {
        return YarnInfoFetcherUtils.getTrackingUrl(yarnAppId);
    }

    public Boolean updateSparkJobInfo(String url, String json) {
        String serverAddress = System.getProperty("spark.driver.rest.server.address", "127.0.0.1:7070");
        String requestApi = String.format(Locale.ROOT, "http://%s%s", serverAddress, url);
        try {
            DefaultHttpClient httpClient = new DefaultHttpClient();
            HttpPut httpPut = new HttpPut(requestApi);
            httpPut.addHeader("Content-Type", "application/json");
            httpPut.setEntity((HttpEntity)new StringEntity(json, StandardCharsets.UTF_8));
            HttpResponse response = httpClient.execute((HttpUriRequest)httpPut);
            int code = response.getStatusLine().getStatusCode();
            if (code == 200) {
                return true;
            }
            InputStream inputStream = response.getEntity().getContent();
            String responseContent = IOUtils.toString((InputStream)inputStream);
            logger.warn("update spark job failed, info: {}", (Object)responseContent);
        }
        catch (IOException e) {
            logger.error("http request {} failed!", (Object)requestApi, (Object)e);
        }
        return false;
    }

    public Boolean updateSparkJobExtraInfo(String url, String project, String jobId, Map<String, String> extraInfo) {
        HashMap<String, String> payload = new HashMap<String, String>(5);
        payload.put("project", project);
        payload.put("taskId", System.getProperty("spark.driver.param.taskId", jobId));
        payload.putAll(extraInfo);
        try {
            String payloadJson = JsonUtil.writeValueAsString(payload);
            int retry = 3;
            for (int i = 0; i < retry; ++i) {
                if (this.updateSparkJobInfo(url, payloadJson).booleanValue()) {
                    return Boolean.TRUE;
                }
                Thread.sleep(3000L);
                logger.warn("retry request rest api update spark extra job info");
            }
        }
        catch (Exception e) {
            logger.error("update spark job extra info failed!", (Throwable)e);
        }
        return Boolean.FALSE;
    }

    private String tryReplaceHostAddress(String url) {
        String originHost = null;
        try {
            URI uri = URI.create(url);
            originHost = uri.getHost();
            String hostAddress = InetAddress.getByName(originHost).getHostAddress();
            return url.replace(originHost, hostAddress);
        }
        catch (UnknownHostException uhe) {
            logger.error("failed to get the ip address of " + originHost + ", step back to use the origin tracking url.", (Throwable)uhe);
            return url;
        }
    }

    private Map<String, String> getTrackingInfo(boolean ipAddressPreferred, String sparkMaster) {
        String applicationId = this.ss.sparkContext().applicationId();
        HashMap<String, String> extraInfo = new HashMap<String, String>();
        try {
            String trackingUrl = this.getTrackingUrl(applicationId);
            if (sparkMaster.startsWith("spark")) {
                trackingUrl = StandaloneAppClient.getAppUrl(applicationId, sparkMaster);
            }
            if (StringUtils.isBlank((String)trackingUrl)) {
                logger.warn("Get tracking url of application {}, but empty url found.", (Object)applicationId);
                return extraInfo;
            }
            if (ipAddressPreferred) {
                trackingUrl = this.tryReplaceHostAddress(trackingUrl);
            }
            extraInfo.put("yarnAppUrl", trackingUrl);
        }
        catch (Exception e) {
            logger.error("get tracking url failed!", (Throwable)e);
        }
        return extraInfo;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void execute() throws Exception {
        String hdfsMetalUrl = this.getParam("distMetaUrl");
        this.jobId = this.getParam("jobId");
        this.project = this.getParam("project");
        if (this.getParam("cuboidsNum") != null) {
            this.layoutSize = Integer.parseInt(this.getParam("cuboidsNum"));
        }
        try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig.setAndUnsetThreadLocalConfig((KylinConfig)MetaDumpUtil.loadKylinConfigFromHdfs(hdfsMetalUrl));){
            this.config = autoCloseConfig.get();
            this.config.setProperty("kylin.source.provider.0", "org.apache.kylin.engine.spark.source.HiveSource");
            KylinBuildEnv buildEnv = KylinBuildEnv.getOrCreate(this.config);
            this.infos = buildEnv.buildJobInfos();
            SparkConf sparkConf = buildEnv.sparkConf();
            if (this.config.isAutoSetSparkConf() && this.isJobOnCluster(sparkConf)) {
                try {
                    this.autoSetSparkConf(sparkConf);
                }
                catch (Exception e) {
                    logger.warn("Auto set spark conf failed. Load spark conf from system properties", (Throwable)e);
                }
                if (!this.config.getSparkConfigOverride().isEmpty()) {
                    for (Map.Entry entry : this.config.getSparkConfigOverride().entrySet()) {
                        logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue());
                        sparkConf.set((String)entry.getKey(), (String)entry.getValue());
                    }
                }
            } else if (!this.isJobOnCluster(sparkConf)) {
                sparkConf.set("spark.master", "local");
                if (!this.config.getSparkConfigOverride().containsKey("spark.sql.shuffle.partitions")) {
                    sparkConf.set("spark.sql.shuffle.partitions", "1");
                }
            }
            if (SparkVersionUtils.isLessThanSparkVersion((String)"2.4", (boolean)true)) {
                sparkConf.set("spark.sql.adaptive.enabled", "false");
            }
            TimeZoneUtils.setDefaultTimeZone((KylinConfig)this.config);
            if (this.isJobOnCluster(sparkConf)) {
                logger.info("Sleep for random seconds to avoid submitting too many spark job at the same time.");
                Thread.sleep((long)(Math.random() * 60.0 * 1000.0));
                try {
                    while (!ResourceUtils.checkResource(sparkConf, buildEnv.clusterInfoFetcher())) {
                        long waitTime = (long)(Math.random() * 10.0 * 60.0 * 1000.0);
                        logger.info("Current available resource in cluster is not sufficient, wait for a period: {}.", (Object)waitTime);
                        Thread.sleep(waitTime);
                    }
                }
                catch (Throwable throwable) {
                    logger.warn("Error occurred when check resource. Ignore it and try to submit this job. ", throwable);
                }
            }
            this.ss = SparkSession.builder().enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false").getOrCreate();
            if (this.isJobOnCluster(sparkConf)) {
                this.updateSparkJobExtraInfo("/kylin/api/jobs/spark", this.project, this.jobId, this.getTrackingInfo(this.config.isTrackingUrlIpAddressEnabled(), sparkConf.get("spark.master")));
            }
            UdfManager.create(this.ss);
            if (!this.config.isUTEnv()) {
                System.setProperty("kylin.env", this.config.getDeployEnv());
            }
            this.infos.startJob();
            this.doExecute();
        }
        finally {
            if (this.infos != null) {
                this.infos.jobEnd();
            }
        }
    }

    public boolean isJobOnCluster(SparkConf conf) {
        return !Utils.isLocalMaster((SparkConf)conf) && !this.config.isUTEnv();
    }

    protected abstract void doExecute() throws Exception;

    protected String calculateRequiredCores() throws Exception {
        return this.config.getSparkEngineRequiredTotalCores();
    }

    private void autoSetSparkConf(SparkConf sparkConf) throws Exception {
        logger.info("Start set spark conf automatically.");
        SparkConfHelper helper = new SparkConfHelper();
        helper.setFetcher(KylinBuildEnv.get().clusterInfoFetcher());
        Path shareDir = this.config.getJobTmpShareDir(this.project, this.jobId);
        FileSystem fs = HadoopUtil.getWorkingFileSystem();
        boolean exists = fs.exists(shareDir);
        if (!exists) {
            logger.info("this not exist {}", (Object)shareDir.toUri().getPath());
            return;
        }
        String contentSize = this.chooseContentSize(shareDir);
        helper.setOption("source_table_size", contentSize);
        helper.setOption("layout_size", Integer.toString(this.layoutSize));
        Map configOverride = this.config.getSparkConfigOverride();
        helper.setConf("spark.yarn.queue", (String)configOverride.get("spark.yarn.queue"));
        helper.setOption("required_cores", this.calculateRequiredCores());
        helper.setConf("count_distinct", this.hasCountDistinct().toString());
        helper.generateSparkConf();
        helper.applySparkConf(sparkConf);
    }

    protected String chooseContentSize(Path shareDir) throws IOException {
        return ResourceDetectUtils.getMaxResourceSize((Path)shareDir) + "b";
    }

    protected Boolean hasCountDistinct() throws IOException {
        Boolean exist;
        Path countDistinct = new Path(this.config.getJobTmpShareDir(this.project, this.jobId), ResourceDetectUtils.countDistinctSuffix());
        FileSystem fileSystem = countDistinct.getFileSystem(HadoopUtil.getCurrentConfiguration());
        if (fileSystem.exists(countDistinct)) {
            exist = (Boolean)ResourceDetectUtils.readResourcePathsAs((Path)countDistinct);
        } else {
            exist = false;
            logger.info("File count_distinct.json doesn't exist, set hasCountDistinct to false.");
        }
        logger.info("Exist count distinct measure: {}", (Object)exist);
        return exist;
    }

    public void logJobInfo() {
        try {
            logger.info(this.generateInfo());
        }
        catch (Exception e) {
            logger.warn("Error occurred when generate job info.", (Throwable)e);
        }
    }

    protected String generateInfo() {
        return LogJobInfoUtils.sparkApplicationInfo();
    }
}

