/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs.scheduler;

import com.google.common.collect.Maps;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.kylin.cluster.ClusterManagerFactory;
import org.apache.kylin.cluster.IClusterManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.JobKiller;
import org.apache.kylin.streaming.util.MetaInfoUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingJobStatusWatcher {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingJobStatusWatcher.class);
    private static List<JobStatusEnum> STATUS_LIST = Arrays.asList(JobStatusEnum.ERROR, JobStatusEnum.RUNNING, JobStatusEnum.STARTING, JobStatusEnum.STOPPING);
    private static int WATCH_INTERVAL = 5;
    private static int JOB_KEEP_TIMEOUT = 30;
    private Map<String, AtomicInteger> startingJobMap = Maps.newHashMap();
    private Map<String, AtomicInteger> stoppingJobMap = Maps.newHashMap();
    private Map<String, AtomicInteger> jobMap = Maps.newHashMap();
    private Map<String, Long> killedJobMap = Maps.newHashMap();
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private boolean init = false;

    public synchronized void schedule() {
        if (!this.init) {
            KylinConfig config = KylinConfig.getInstanceFromEnv();
            if (StreamingUtils.isJobOnCluster((KylinConfig)config) && "true".equals(config.getStreamingJobStatusWatchEnabled())) {
                this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
                    List<String> runningJobs = this.getRunningJobs();
                    this.execute(runningJobs);
                }, WATCH_INTERVAL, WATCH_INTERVAL, TimeUnit.MINUTES);
            }
            this.init = true;
        }
    }

    private List<String> getRunningJobs() {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        IClusterManager cm = ClusterManagerFactory.create((KylinConfig)config);
        List runningJobsOnYarn = cm.getRunningJobs(Collections.emptySet());
        return runningJobsOnYarn;
    }

    public synchronized void execute(List<String> runningJobsOnYarn) {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        NProjectManager prjManager = NProjectManager.getInstance((KylinConfig)config);
        List prjList = prjManager.listAllProjects();
        prjList.stream().forEach(projectInstance -> {
            String project = projectInstance.getName();
            StreamingJobManager mgr = StreamingJobManager.getInstance(config, project);
            List<StreamingJobMeta> jobMetaList = mgr.listAllStreamingJobMeta();
            for (StreamingJobMeta meta : jobMetaList) {
                String jobId = StreamingUtils.getJobId((String)meta.getModelId(), (String)meta.getJobType().name());
                if (this.killedJobMap.containsKey(jobId)) {
                    long keepTime = System.currentTimeMillis() - this.killedJobMap.get(jobId);
                    if (keepTime <= (long)(JOB_KEEP_TIMEOUT * 60 * 1000)) continue;
                    this.jobMap.remove(jobId);
                    this.killedJobMap.remove(jobId);
                    continue;
                }
                if (!STATUS_LIST.contains(meta.getCurrentStatus())) continue;
                if (runningJobsOnYarn.contains(jobId)) {
                    if (!this.jobMap.containsKey(jobId)) {
                        this.jobMap.put(jobId, new AtomicInteger(0));
                        continue;
                    }
                    if (this.jobMap.get(jobId).get() == 0) continue;
                    this.jobMap.get(jobId).set(0);
                    continue;
                }
                this.processMissingJobsFromYarn(meta, jobId);
            }
        });
    }

    private void processMissingJobsFromYarn(StreamingJobMeta meta, String jobId) {
        String project = meta.getProject();
        if (this.jobMap.containsKey(jobId)) {
            this.killStreamingDriverProcess(jobId, project, meta);
        } else if (meta.getCurrentStatus() == JobStatusEnum.STARTING) {
            this.moveJobId(this.startingJobMap, jobId);
        } else if (meta.getCurrentStatus() == JobStatusEnum.STOPPING) {
            this.moveJobId(this.stoppingJobMap, jobId);
        } else if (meta.getCurrentStatus() == JobStatusEnum.RUNNING) {
            this.jobMap.put(jobId, new AtomicInteger(0));
        } else {
            String lastUpdateTime = meta.getLastUpdateTime();
            SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
            try {
                if (lastUpdateTime != null) {
                    Date lastDateTime = simpleFormat.parse(lastUpdateTime);
                    long diff = (System.currentTimeMillis() - lastDateTime.getTime()) / 60000L;
                    if (diff <= (long)JOB_KEEP_TIMEOUT) {
                        this.jobMap.put(jobId, new AtomicInteger(0));
                    }
                }
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    private void killStreamingDriverProcess(String jobId, String project, StreamingJobMeta meta) {
        AtomicInteger cnt = this.jobMap.get(jobId);
        if (cnt.get() < 3) {
            cnt.getAndIncrement();
        } else {
            log.info("Begin to find & kill streaming job:" + jobId);
            int statusCode = JobKiller.killProcess(meta);
            log.info(jobId + " statusCode=" + statusCode);
            if (meta.getCurrentStatus() != JobStatusEnum.ERROR) {
                MetaInfoUpdater.updateJobState(project, jobId, JobStatusEnum.ERROR);
            }
            this.killedJobMap.put(jobId, System.currentTimeMillis());
        }
    }

    private void moveJobId(Map<String, AtomicInteger> tmpMap, String jobId) {
        if (!tmpMap.containsKey(jobId)) {
            tmpMap.put(jobId, new AtomicInteger(0));
        } else {
            tmpMap.get(jobId).getAndIncrement();
            if (tmpMap.get(jobId).get() >= 3) {
                this.jobMap.put(jobId, new AtomicInteger(0));
                tmpMap.remove(jobId);
            }
        }
    }
}

