/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.tasks.job;

import com.netflix.genie.common.dto.Job;
import com.netflix.genie.common.dto.JobExecution;
import com.netflix.genie.common.dto.JobStatus;
import com.netflix.genie.common.exceptions.GenieException;
import com.netflix.genie.common.exceptions.GenieServerException;
import com.netflix.genie.common.internal.util.GenieHostInfo;
import com.netflix.genie.web.events.GenieEventBus;
import com.netflix.genie.web.events.JobFinishedEvent;
import com.netflix.genie.web.events.JobFinishedReason;
import com.netflix.genie.web.events.JobStartedEvent;
import com.netflix.genie.web.properties.JobsProperties;
import com.netflix.genie.web.services.JobSearchService;
import com.netflix.genie.web.services.JobSubmitterService;
import com.netflix.genie.web.services.impl.JobStateServiceImpl;
import com.netflix.genie.web.tasks.job.JobMonitor;
import com.netflix.genie.web.util.ProcessChecker;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Primary;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;

@Component
@Primary
public class JobMonitoringCoordinator
extends JobStateServiceImpl {
    private static final Logger log = LoggerFactory.getLogger(JobMonitoringCoordinator.class);
    private final String hostname;
    private final JobSearchService jobSearchService;
    private final File jobsDir;
    private final JobsProperties jobsProperties;
    private final ProcessChecker.Factory processCheckerFactory;
    private final Counter unableToReAttach;

    @Autowired
    public JobMonitoringCoordinator(GenieHostInfo genieHostInfo, JobSearchService jobSearchService, GenieEventBus genieEventBus, @Qualifier(value="genieTaskScheduler") TaskScheduler scheduler, MeterRegistry registry, Resource jobsDir, JobsProperties jobsProperties, JobSubmitterService jobSubmitterService, ProcessChecker.Factory processCheckerFactory) throws IOException {
        super(jobSubmitterService, scheduler, genieEventBus, registry);
        this.hostname = genieHostInfo.getHostname();
        this.jobSearchService = jobSearchService;
        this.jobsDir = jobsDir.getFile();
        this.jobsProperties = jobsProperties;
        this.processCheckerFactory = processCheckerFactory;
        this.unableToReAttach = registry.counter("genie.jobs.unableToReAttach.rate", new String[0]);
    }

    @EventListener
    public void onStartup(ContextRefreshedEvent event) throws GenieException {
        this.reAttach((ApplicationEvent)event);
    }

    @EventListener
    public void onJobStarted(JobStartedEvent event) {
        String jobId = (String)event.getJobExecution().getId().orElseThrow(IllegalArgumentException::new);
        this.setMemoryAndTask(jobId, event.getJobExecution().getMemory().orElse(0), this.scheduleMonitor(event.getJobExecution()));
    }

    @EventListener
    public void onJobFinished(JobFinishedEvent event) throws GenieException {
        this.done(event.getId());
    }

    private void reAttach(ApplicationEvent event) throws GenieException {
        log.info("Application is ready according to event {}. Attempting to re-attach to any active jobs", (Object)event);
        Set<Job> jobsOnHost = this.jobSearchService.getAllActiveJobsOnHost(this.hostname);
        if (jobsOnHost.isEmpty()) {
            log.info("No jobs currently active on this node.");
            return;
        }
        log.info("{} jobs currently active on this node at startup", (Object)jobsOnHost.size());
        for (Job job : jobsOnHost) {
            String id = (String)job.getId().orElseThrow(() -> new GenieServerException("Job has no id!"));
            if (this.jobExists(id)) {
                log.info("Job {} is already being tracked. Ignoring.", (Object)id);
                continue;
            }
            if (job.getStatus() != JobStatus.RUNNING) {
                this.genieEventBus.publishAsynchronousEvent(new JobFinishedEvent(id, JobFinishedReason.SYSTEM_CRASH, "System crashed while job starting", this));
                continue;
            }
            try {
                JobExecution jobExecution = this.jobSearchService.getJobExecution(id);
                this.init(id);
                this.setMemoryAndTask(id, jobExecution.getMemory().orElse(0), this.scheduleMonitor(jobExecution));
                log.info("Re-attached a job monitor to job {}", (Object)id);
            }
            catch (GenieException ge) {
                log.error("Unable to re-attach to job {}.", (Object)id, (Object)ge);
                this.genieEventBus.publishAsynchronousEvent(new JobFinishedEvent(id, JobFinishedReason.SYSTEM_CRASH, "Unable to re-attach on startup", this));
                this.unableToReAttach.increment();
            }
        }
    }

    private Future<?> scheduleMonitor(JobExecution jobExecution) {
        ScheduledFuture future;
        String jobId = (String)jobExecution.getId().orElseThrow(IllegalArgumentException::new);
        File stdOut = new File(this.jobsDir, jobId + "/" + "stdout");
        File stdErr = new File(this.jobsDir, jobId + "/" + "stderr");
        int processId = (Integer)jobExecution.getProcessId().orElseThrow(IllegalArgumentException::new);
        Instant timeout = (Instant)jobExecution.getTimeout().orElseThrow(IllegalArgumentException::new);
        ProcessChecker processChecker = this.processCheckerFactory.get(processId, timeout);
        JobMonitor monitor = new JobMonitor(jobExecution, stdOut, stdErr, this.genieEventBus, this.registry, this.jobsProperties, processChecker);
        switch (monitor.getScheduleType()) {
            case TRIGGER: {
                future = this.scheduler.schedule((Runnable)monitor, monitor.getTrigger());
                break;
            }
            case FIXED_DELAY: {
                future = this.scheduler.scheduleWithFixedDelay((Runnable)monitor, monitor.getFixedDelay());
                break;
            }
            case FIXED_RATE: {
                future = this.scheduler.scheduleAtFixedRate((Runnable)monitor, monitor.getFixedRate());
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown schedule type: " + (Object)((Object)monitor.getScheduleType()));
            }
        }
        log.info("Scheduled job monitoring for Job {}", (Object)jobExecution.getId());
        return future;
    }
}

