/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.tool;

import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.util.ExecutableApplication;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionBuilder;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingSparkLogTool
extends ExecutableApplication {
    @Generated
    private static final Logger log = LoggerFactory.getLogger((String)"diag");
    private static final Option OPTION_STREAMING_DIR = OptionBuilder.getInstance().hasArg().withArgName("DESTINATION_DIR").withDescription("Specify the file to save yarn application id").isRequired(true).create("dir");
    private static final Option OPTION_STREAMING_JOB = OptionBuilder.getInstance().hasArg().withArgName("JOB_ID").withDescription("Specify the job").isRequired(false).create("job");
    private static final Option OPTION_STREAMING_PROJECT = OptionBuilder.getInstance().hasArg().withArgName("OPTION_PROJECT").withDescription("Specify project").isRequired(false).create("project");
    private static final Option OPTION_STREAMING_START_TIME = OptionBuilder.getInstance().withArgName("startTime").hasArg().isRequired(false).withDescription("specify the start of time range to extract logs. ").create("startTime");
    private static final Option OPTION_STREAMING_END_TIME = OptionBuilder.getInstance().withArgName("endTime").hasArg().isRequired(false).withDescription("specify the end of time range to extract logs. ").create("endTime");
    private static final long DAY = 86400000L;
    private static final long MAX_DAY = 31L;
    private static final int JOB_LOG_COUNT = 3;
    private static final String STREAMING_LOG_ROOT_DIR = "streaming_spark_logs";
    private static final String STREAMING_SPARK_DRIVER_DIR = "spark_driver";
    private static final String STREAMING_SPARK_EXECUTOR_DIR = "spark_executor";
    private static final String STREAMING_SPARK_CHECKPOINT_DIR = "spark_checkpoint";
    private final Options options;
    private final KylinConfig kylinConfig;

    StreamingSparkLogTool() {
        this(KylinConfig.getInstanceFromEnv());
    }

    public StreamingSparkLogTool(KylinConfig kylinConfig) {
        this.kylinConfig = kylinConfig;
        this.options = new Options();
        this.initOptions();
    }

    private static <T> List<T> lastN(Stream<T> stream) {
        ArrayDeque result = new ArrayDeque(3);
        stream.forEach(x -> {
            if (result.size() == 3) {
                result.pop();
            }
            result.add(x);
        });
        return new ArrayList(result);
    }

    private void initOptions() {
        this.options.addOption(OPTION_STREAMING_JOB);
        this.options.addOption(OPTION_STREAMING_PROJECT);
        this.options.addOption(OPTION_STREAMING_DIR);
        this.options.addOption(OPTION_STREAMING_END_TIME);
        this.options.addOption(OPTION_STREAMING_START_TIME);
    }

    protected Options getOptions() {
        return this.options;
    }

    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String dir = optionsHelper.getOptionValue(OPTION_STREAMING_DIR);
        String jobId = optionsHelper.getOptionValue(OPTION_STREAMING_JOB);
        String project = optionsHelper.getOptionValue(OPTION_STREAMING_PROJECT);
        String startTimeStr = optionsHelper.getOptionValue(OPTION_STREAMING_START_TIME);
        String endTimeStr = optionsHelper.getOptionValue(OPTION_STREAMING_END_TIME);
        if (StringUtils.isNotEmpty((CharSequence)project) && StringUtils.isNotEmpty((CharSequence)jobId)) {
            log.info("start dump streaming spark driver/executor/checkpoint job log, project: {}, jobId: {}", (Object)project, (Object)jobId);
            Map<String, Map<String, Set<String>>> projectJobMap = this.dumpJobDriverLog(project, jobId, dir, null, null);
            this.dumpExecutorLog(projectJobMap, dir);
            if (jobId.contains("_merge")) {
                log.warn("Only build job have checkpoint, current job: {}", (Object)jobId);
                return;
            }
            this.dumpCheckPoint(project, StringUtils.split((String)jobId, (String)"_")[0], dir);
            return;
        }
        if (StringUtils.isNotEmpty((CharSequence)startTimeStr) && StringUtils.isNotEmpty((CharSequence)endTimeStr)) {
            Long startTime = Long.parseLong(startTimeStr);
            Long endTime = Long.parseLong(endTimeStr);
            long days = (endTime - startTime) / 86400000L;
            if (days > 31L) {
                log.error("time range is too large, startTime: {}, endTime: {}, days: {}", new Object[]{startTime, endTime, days});
                return;
            }
            log.info("start dump streaming spark driver/executor/checkpoint full log, startTime: {}, endTime: {}", (Object)startTimeStr, (Object)endTimeStr);
            Map<String, Map<String, Set<String>>> projectJobMap = this.dumpAllDriverLog(dir, startTimeStr, endTimeStr);
            if (ObjectUtils.isEmpty(projectJobMap)) {
                return;
            }
            this.dumpExecutorLog(projectJobMap, dir);
            this.dumpAllCheckPoint(dir, projectJobMap);
        }
    }

    private Map<String, Map<String, Set<String>>> dumpJobDriverLog(String project, String jobId, String exportDir, String startTime, String endTime) {
        HashMap<String, Map<String, Set<String>>> projectJobMap = new HashMap<String, Map<String, Set<String>>>();
        HashMap jobTimeMap = new HashMap();
        String outputStoreDirPath = this.kylinConfig.getStreamingJobTmpOutputStorePath(project, jobId);
        NExecutableManager executableManager = NExecutableManager.getInstance((KylinConfig)this.kylinConfig, (String)project);
        FileSystem fs = HadoopUtil.getWorkingFileSystem();
        if (!executableManager.isHdfsPathExists(outputStoreDirPath)) {
            log.warn("The job driver log file on HDFS has not been generated yet, jobId: {}, filePath: {}", (Object)jobId, (Object)outputStoreDirPath);
            return projectJobMap;
        }
        List logFilePathList = executableManager.getFilePathsFromHDFSDir(outputStoreDirPath, true);
        if (CollectionUtils.isEmpty((Collection)logFilePathList)) {
            log.warn("There is no file in the current job HDFS directory: {}", (Object)outputStoreDirPath);
            return projectJobMap;
        }
        File driverDir = new File(exportDir, String.format(Locale.ROOT, "/%s/%s/%s/%s", STREAMING_LOG_ROOT_DIR, STREAMING_SPARK_DRIVER_DIR, project, jobId));
        ArrayList needCopyLogPathList = new ArrayList();
        ArrayList logPathFullList = new ArrayList();
        HashSet needCopyJobStartedSet = new HashSet();
        HashSet jobStartedFullSet = new HashSet();
        boolean isJob = StringUtils.isEmpty((CharSequence)startTime) && StringUtils.isEmpty((CharSequence)endTime);
        logFilePathList.stream().map(Path::new).filter(path -> {
            logPathFullList.add(path);
            jobStartedFullSet.add(path.getParent().getName());
            if (isJob) {
                return true;
            }
            Long logTimeStamp = Long.parseLong(StringUtils.split((String)path.getName(), (String)"\\.")[1]);
            return logTimeStamp.compareTo(Long.parseLong(startTime)) >= 0 && logTimeStamp.compareTo(Long.parseLong(endTime)) <= 0;
        }).forEach(path -> {
            needCopyLogPathList.add(path);
            String jobStartedTime = path.getParent().getName();
            needCopyJobStartedSet.add(jobStartedTime);
        });
        if (CollectionUtils.isEmpty(logPathFullList)) {
            return projectJobMap;
        }
        try {
            FileUtils.forceMkdir((File)driverDir);
            if (isJob) {
                List needCopyPathLimitedList = StreamingSparkLogTool.lastN(needCopyJobStartedSet.stream().sorted());
                HashSet<String> startedTimeSet = new HashSet<String>();
                for (Path path2 : needCopyLogPathList) {
                    String jobStartTime = path2.getParent().getName();
                    if (!needCopyPathLimitedList.contains(jobStartTime)) continue;
                    File jobStartedTimeDir = new File(driverDir, jobStartTime);
                    FileUtils.forceMkdir((File)jobStartedTimeDir);
                    fs.copyToLocalFile(path2, new Path(jobStartedTimeDir.getAbsolutePath()));
                    startedTimeSet.add(jobStartTime);
                }
                jobTimeMap.put(jobId, startedTimeSet);
                projectJobMap.put(project, jobTimeMap);
                return projectJobMap;
            }
            if (needCopyJobStartedSet.isEmpty()) {
                needCopyJobStartedSet.add(Collections.max(jobStartedFullSet));
            }
            for (Path path3 : logPathFullList) {
                if (!needCopyJobStartedSet.contains(path3.getParent().getName())) continue;
                String jobStartTime = path3.getParent().getName();
                File jobStartedTimeDir = new File(driverDir, jobStartTime);
                FileUtils.forceMkdir((File)jobStartedTimeDir);
                fs.copyToLocalFile(path3, new Path(jobStartedTimeDir.getAbsolutePath()));
            }
            jobTimeMap.put(jobId, needCopyJobStartedSet);
            projectJobMap.put(project, jobTimeMap);
        }
        catch (IOException e) {
            log.error("dump streaming driver log failed. ", (Throwable)e);
        }
        return projectJobMap;
    }

    private Map<String, Map<String, Set<String>>> dumpAllDriverLog(String exportDir, String startTime, String endTime) {
        NProjectManager projectManager = NProjectManager.getInstance((KylinConfig)this.kylinConfig);
        HashMap<String, Map<String, Set<String>>> projectJobMap = new HashMap<String, Map<String, Set<String>>>();
        projectManager.listAllProjects().forEach(projectInstance -> {
            HashMap jobTimeMap = new HashMap();
            String project = projectInstance.getName();
            StreamingJobManager streamingJobManager = StreamingJobManager.getInstance((KylinConfig)this.kylinConfig, (String)project);
            streamingJobManager.listAllStreamingJobMeta().stream().map(RootPersistentEntity::getId).forEach(jobId -> {
                Map<String, Map<String, Set<String>>> map = this.dumpJobDriverLog(project, (String)jobId, exportDir, startTime, endTime);
                if (ObjectUtils.isEmpty(map)) {
                    return;
                }
                for (Map.Entry<String, Map<String, Set<String>>> entry : map.entrySet()) {
                    for (Map.Entry<String, Set<String>> setEntry : entry.getValue().entrySet()) {
                        jobTimeMap.put(jobId, setEntry.getValue());
                    }
                }
            });
            if (ObjectUtils.isEmpty(jobTimeMap)) {
                return;
            }
            projectJobMap.put(project, jobTimeMap);
        });
        return projectJobMap;
    }

    private void dumpAllCheckPoint(String exportDir, Map<String, Map<String, Set<String>>> projectJobMap) {
        NProjectManager projectManager = NProjectManager.getInstance((KylinConfig)this.kylinConfig);
        projectManager.listAllProjects().stream().map(ProjectInstance::getName).filter(projectJobMap.keySet()::contains).forEach(project -> {
            Set jobIdSet = ((Map)projectJobMap.get(project)).keySet();
            StreamingJobManager streamingJobManager = StreamingJobManager.getInstance((KylinConfig)this.kylinConfig, (String)project);
            streamingJobManager.listAllStreamingJobMeta().stream().map(StreamingJobMeta::getModelId).distinct().filter(modelId -> jobIdSet.contains(modelId.concat("_build"))).forEach(modelId -> this.dumpCheckPoint((String)project, (String)modelId, exportDir));
        });
    }

    private void dumpCheckPoint(String project, String modelId, String exportDir) {
        String hdfsStreamLogRootPath = this.kylinConfig.getHdfsWorkingDirectoryWithoutScheme();
        String hdfsStreamJobCheckPointPath = String.format(Locale.ROOT, "%s%s%s", hdfsStreamLogRootPath, "streaming/checkpoint/", modelId);
        NExecutableManager executableManager = NExecutableManager.getInstance((KylinConfig)this.kylinConfig, (String)project);
        FileSystem fs = HadoopUtil.getWorkingFileSystem();
        if (!executableManager.isHdfsPathExists(hdfsStreamJobCheckPointPath)) {
            log.warn("The job checkpoint file on HDFS has not been generated yet, modelId: {}, filePath: {}", (Object)modelId, (Object)hdfsStreamJobCheckPointPath);
            return;
        }
        List executorLogPath = executableManager.getFilePathsFromHDFSDir(hdfsStreamJobCheckPointPath, true);
        if (CollectionUtils.isEmpty((Collection)executorLogPath)) {
            log.warn("There is no file in the current job HDFS directory: {}", (Object)hdfsStreamJobCheckPointPath);
            return;
        }
        File checkpointDir = new File(exportDir, String.format(Locale.ROOT, "/%s/%s/%s", STREAMING_LOG_ROOT_DIR, STREAMING_SPARK_CHECKPOINT_DIR, project));
        try {
            FileUtils.forceMkdir((File)checkpointDir);
            fs.copyToLocalFile(new Path(hdfsStreamJobCheckPointPath), new Path(checkpointDir.getAbsolutePath()));
        }
        catch (IOException e) {
            log.error("dump streaming checkpoint failed. ", (Throwable)e);
        }
    }

    private void dumpExecutorLog(Map<String, Map<String, Set<String>>> projectJobMap, String exportDir) {
        if (ObjectUtils.isEmpty(projectJobMap)) {
            return;
        }
        for (Map.Entry<String, Map<String, Set<String>>> entryOut : projectJobMap.entrySet()) {
            String project = entryOut.getKey();
            Map<String, Set<String>> jobTimeMap = entryOut.getValue();
            for (Map.Entry<String, Set<String>> entryInner : jobTimeMap.entrySet()) {
                String jobId = entryInner.getKey();
                Set<String> jobStartedSet = entryInner.getValue();
                this.dumpSingleExecutorLog(project, jobId, exportDir, jobStartedSet);
            }
        }
    }

    private void dumpSingleExecutorLog(String project, String jobId, String exportDir, Set<String> jobStartedSet) {
        String hdfsStreamLogRootPath = this.kylinConfig.getHdfsWorkingDirectoryWithoutScheme();
        String hdfsStreamLogProjectPath = String.format(Locale.ROOT, "%s%s%s", hdfsStreamLogRootPath, "streaming/spark_logs/", project);
        NExecutableManager executableManager = NExecutableManager.getInstance((KylinConfig)this.kylinConfig, (String)project);
        FileSystem fs = HadoopUtil.getWorkingFileSystem();
        if (!executableManager.isHdfsPathExists(hdfsStreamLogProjectPath)) {
            log.warn("The job executor log file on HDFS has not been generated yet, jobId: {}, filePath: {}", (Object)jobId, (Object)hdfsStreamLogProjectPath);
            return;
        }
        List executorLogPath = executableManager.getFilePathsFromHDFSDir(hdfsStreamLogProjectPath, true);
        if (CollectionUtils.isEmpty((Collection)executorLogPath)) {
            log.warn("There is no file in the current job HDFS directory: {}", (Object)hdfsStreamLogProjectPath);
            return;
        }
        executorLogPath.stream().filter(StringUtils::isNotEmpty).map(Path::new).filter(path -> StringUtils.isEmpty((CharSequence)jobId) || StringUtils.equals((CharSequence)path.getParent().getParent().getName(), (CharSequence)jobId)).filter(path -> {
            String executorTimeStamp = path.getParent().getName();
            return jobStartedSet.contains(executorTimeStamp);
        }).forEach(logPath -> {
            String logJobId = logPath.getParent().getParent().getName();
            String executorDateTime = logPath.getParent().getName();
            File executorDir = new File(exportDir, String.format(Locale.ROOT, "/%s/%s/%s/%s/%s", STREAMING_LOG_ROOT_DIR, STREAMING_SPARK_EXECUTOR_DIR, project, logJobId, executorDateTime));
            try {
                FileUtils.forceMkdir((File)executorDir);
                fs.copyToLocalFile(logPath, new Path(executorDir.getAbsolutePath()));
            }
            catch (IOException e) {
                log.error("dump streaming executor log failed. ", (Throwable)e);
            }
        });
    }
}

