/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.scheduler;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.scheduler.AbstractScheduler;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.JobListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteScheduler
extends AbstractScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteScheduler.class);
    private RemoteInterpreter remoteInterpreter;
    private ExecutorService executor;

    public RemoteScheduler(String name, ExecutorService executor, RemoteInterpreter remoteInterpreter) {
        super(name);
        this.executor = executor;
        this.remoteInterpreter = remoteInterpreter;
    }

    public void runJobInScheduler(Job job) {
        JobRunner jobRunner = new JobRunner(this, job);
        this.executor.execute(jobRunner);
        String executionMode = this.remoteInterpreter.getProperty(".execution.mode", "paragraph");
        if (executionMode.equals("paragraph")) {
            while (!jobRunner.isJobSubmittedInRemote()) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote queue.wait", (Throwable)e);
                }
            }
        } else if (executionMode.equals("note")) {
            while (!jobRunner.isJobExecuted()) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobExecuted queue.wait", (Throwable)e);
                }
            }
        } else {
            throw new RuntimeException("Invalid job execution.mode: " + executionMode + ", only 'note' and 'paragraph' are valid");
        }
    }

    public void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit) {
        super.stop();
    }

    private class JobRunner
    implements Runnable,
    JobListener {
        private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
        private RemoteScheduler scheduler;
        private Job job;
        private volatile boolean jobExecuted;
        private volatile boolean jobSubmittedRemotely;

        public JobRunner(RemoteScheduler scheduler, Job job) {
            this.scheduler = scheduler;
            this.job = job;
            this.jobExecuted = false;
            this.jobSubmittedRemotely = false;
        }

        public boolean isJobSubmittedInRemote() {
            return this.jobSubmittedRemotely;
        }

        public boolean isJobExecuted() {
            return this.jobExecuted;
        }

        @Override
        public void run() {
            JobStatusPoller jobStatusPoller = new JobStatusPoller(this.job, this, 100L);
            jobStatusPoller.start();
            this.scheduler.runJob(this.job);
            this.jobExecuted = true;
            this.jobSubmittedRemotely = true;
            jobStatusPoller.shutdown();
            try {
                jobStatusPoller.join();
            }
            catch (InterruptedException e) {
                this.logger.error("JobStatusPoller interrupted", (Throwable)e);
            }
        }

        public void onProgressUpdate(Job job, int progress) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onStatusChange(Job job, Job.Status before, Job.Status after) {
            if (!this.jobExecuted) {
                if (after == Job.Status.FINISHED || after == Job.Status.ABORT || after == Job.Status.ERROR) {
                    return;
                }
                if (after == Job.Status.RUNNING) {
                    this.jobSubmittedRemotely = true;
                    job.setStatus(Job.Status.RUNNING);
                }
            } else {
                this.jobSubmittedRemotely = true;
            }
            Job job2 = job;
            synchronized (job2) {
                if (after == Job.Status.RUNNING && job.getStatus() == Job.Status.PENDING) {
                    job.setStatus(Job.Status.RUNNING);
                }
            }
        }
    }

    private class JobStatusPoller
    extends Thread {
        private long checkIntervalMsec;
        private volatile boolean terminate;
        private JobListener listener;
        private Job job;
        private volatile Job.Status lastStatus;

        public JobStatusPoller(Job job, JobListener listener, long checkIntervalMsec) {
            this.setName("JobStatusPoller-" + job.getId());
            this.checkIntervalMsec = checkIntervalMsec;
            this.job = job;
            this.listener = listener;
            this.terminate = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!this.terminate) {
                Job.Status newStatus;
                JobStatusPoller jobStatusPoller = this;
                synchronized (jobStatusPoller) {
                    try {
                        this.wait(this.checkIntervalMsec);
                    }
                    catch (InterruptedException e) {
                        LOGGER.error("Exception in RemoteScheduler while run this.wait", (Throwable)e);
                    }
                }
                if (!this.terminate && (newStatus = this.getStatus()) != Job.Status.RUNNING && newStatus != Job.Status.FINISHED && newStatus != Job.Status.ERROR && newStatus != Job.Status.ABORT) continue;
                break;
            }
            this.terminate = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void shutdown() {
            this.terminate = true;
            JobStatusPoller jobStatusPoller = this;
            synchronized (jobStatusPoller) {
                this.notify();
            }
        }

        public Job.Status getStatus() {
            if (!RemoteScheduler.this.remoteInterpreter.isOpened()) {
                if (this.lastStatus != null) {
                    return this.lastStatus;
                }
                return this.job.getStatus();
            }
            Job.Status status = Job.Status.valueOf((String)RemoteScheduler.this.remoteInterpreter.getStatus(this.job.getId()));
            if (status == Job.Status.UNKNOWN) {
                return this.job.getStatus();
            }
            this.listener.onStatusChange(this.job, this.lastStatus, status);
            this.lastStatus = status;
            return status;
        }
    }
}

