/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TIPStatus;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReliabilityTest
extends Configured
implements Tool {
    private String dir;
    private static final Log LOG = LogFactory.getLog(ReliabilityTest.class);

    private void displayUsage() {
        LOG.info((Object)"This must be run in only the distributed mode (LocalJobRunner not supported).\n\tUsage: MRReliabilityTest -libjars <path to hadoop-examples.jar> [-scratchdir <dir>]\n[-scratchdir] points to a scratch space on this host where temp files for this test will be created. Defaults to current working dir. \nPasswordless SSH must be set up between this host and the nodes which the test is going to use.\nThe test should be run on a free cluster with no parallel job submission going on, as the test requires to restart TaskTrackers and kill tasks any job submission while the tests are running can cause jobs/tests to fail");
        System.exit(-1);
    }

    public int run(String[] args) throws Exception {
        String[] otherArgs;
        Configuration conf = this.getConf();
        if ("local".equals(conf.get("mapreduce.jobtracker.address", "local"))) {
            this.displayUsage();
        }
        if ((otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs()).length == 2) {
            if (otherArgs[0].equals("-scratchdir")) {
                this.dir = otherArgs[1];
            } else {
                this.displayUsage();
            }
        } else if (otherArgs.length == 0) {
            this.dir = System.getProperty("user.dir");
        } else {
            this.displayUsage();
        }
        conf.setInt("mapreduce.map.maxattempts", 10);
        conf.setInt("mapreduce.reduce.maxattempts", 10);
        this.runSleepJobTest(new JobClient(new JobConf(conf)), conf);
        this.runSortJobTests(new JobClient(new JobConf(conf)), conf);
        return 0;
    }

    private void runSleepJobTest(JobClient jc, Configuration conf) throws Exception {
        int mapSleepTime;
        int maxMaps;
        ClusterStatus c = jc.getClusterStatus();
        int maxReduces = maxMaps = c.getMaxMapTasks() * 2;
        int reduceSleepTime = mapSleepTime = (int)c.getTTExpiryInterval();
        String[] sleepJobArgs = new String[]{"-m", Integer.toString(maxMaps), "-r", Integer.toString(maxReduces), "-mt", Integer.toString(mapSleepTime), "-rt", Integer.toString(reduceSleepTime)};
        this.runTest(jc, conf, "org.apache.hadoop.mapreduce.SleepJob", sleepJobArgs, new KillTaskThread(jc, 2, 0.2f, false, 2), new KillTrackerThread(jc, 2, 0.4f, false, 1));
        LOG.info((Object)"SleepJob done");
    }

    private void runSortJobTests(JobClient jc, Configuration conf) throws Exception {
        String inputPath = "my_reliability_test_input";
        String outputPath = "my_reliability_test_output";
        FileSystem fs = jc.getFs();
        fs.delete(new Path(inputPath), true);
        fs.delete(new Path(outputPath), true);
        this.runRandomWriterTest(jc, conf, inputPath);
        this.runSortTest(jc, conf, inputPath, outputPath);
        this.runSortValidatorTest(jc, conf, inputPath, outputPath);
    }

    private void runRandomWriterTest(JobClient jc, Configuration conf, String inputPath) throws Exception {
        this.runTest(jc, conf, "org.apache.hadoop.examples.RandomWriter", new String[]{inputPath}, null, new KillTrackerThread(jc, 0, 0.4f, false, 1));
        LOG.info((Object)"RandomWriter job done");
    }

    private void runSortTest(JobClient jc, Configuration conf, String inputPath, String outputPath) throws Exception {
        this.runTest(jc, conf, "org.apache.hadoop.examples.Sort", new String[]{inputPath, outputPath}, new KillTaskThread(jc, 2, 0.2f, false, 2), new KillTrackerThread(jc, 2, 0.8f, false, 1));
        LOG.info((Object)"Sort job done");
    }

    private void runSortValidatorTest(JobClient jc, Configuration conf, String inputPath, String outputPath) throws Exception {
        this.runTest(jc, conf, "org.apache.hadoop.mapred.SortValidator", new String[]{"-sortInput", inputPath, "-sortOutput", outputPath}, new KillTaskThread(jc, 2, 0.2f, false, 1), new KillTrackerThread(jc, 2, 0.8f, false, 1));
        LOG.info((Object)"SortValidator job done");
    }

    private String normalizeCommandPath(String command) {
        String hadoopHome = System.getenv("HADOOP_PREFIX");
        if (hadoopHome != null) {
            command = hadoopHome + "/" + command;
        }
        return command;
    }

    private void checkJobExitStatus(int status, String jobName) {
        if (status != 0) {
            LOG.info((Object)(jobName + " job failed with status: " + status));
            System.exit(status);
        } else {
            LOG.info((Object)(jobName + " done."));
        }
    }

    private void runTest(JobClient jc, final Configuration conf, final String jobClass, final String[] args, KillTaskThread killTaskThread, KillTrackerThread killTrackerThread) throws Exception {
        JobStatus[] jobs;
        Thread t = new Thread("Job Test"){

            @Override
            public void run() {
                try {
                    Class jobClassObj = conf.getClassByName(jobClass);
                    int status = ToolRunner.run((Configuration)conf, (Tool)((Tool)jobClassObj.newInstance()), (String[])args);
                    ReliabilityTest.this.checkJobExitStatus(status, jobClass);
                }
                catch (Exception e) {
                    LOG.fatal((Object)("JOB " + jobClass + " failed to run"));
                    System.exit(-1);
                }
            }
        };
        t.setDaemon(true);
        t.start();
        while ((jobs = jc.jobsToComplete()).length == 0) {
            LOG.info((Object)("Waiting for the job " + jobClass + " to start"));
            Thread.sleep(1000L);
        }
        JobID jobId = jobs[jobs.length - 1].getJobID();
        RunningJob rJob = jc.getJob(jobId);
        if (rJob.isComplete()) {
            LOG.error((Object)("The last job returned by the querying JobTracker is complete :" + rJob.getJobID() + " .Exiting the test"));
            System.exit(-1);
        }
        while (rJob.getJobState() == JobStatus.PREP) {
            LOG.info((Object)("JobID : " + jobId + " not started RUNNING yet"));
            Thread.sleep(1000L);
            rJob = jc.getJob(jobId);
        }
        if (killTaskThread != null) {
            killTaskThread.setRunningJob(rJob);
            killTaskThread.start();
            killTaskThread.join();
            LOG.info((Object)"DONE WITH THE TASK KILL/FAIL TESTS");
        }
        if (killTrackerThread != null) {
            killTrackerThread.setRunningJob(rJob);
            killTrackerThread.start();
            killTrackerThread.join();
            LOG.info((Object)"DONE WITH THE TESTS TO DO WITH LOST TASKTRACKERS");
        }
        t.join();
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)new Configuration(), (Tool)new ReliabilityTest(), (String[])args);
        System.exit(res);
    }

    static /* synthetic */ String access$200(ReliabilityTest x0) {
        return x0.dir;
    }

    static /* synthetic */ String access$300(ReliabilityTest x0, String x1) {
        return x0.normalizeCommandPath(x1);
    }

    private class KillTaskThread
    extends Thread {
        private volatile boolean killed = false;
        private RunningJob rJob;
        private JobClient jc;
        private final int thresholdMultiplier;
        private float threshold = 0.2f;
        private boolean onlyMapsProgress;
        private int numIterations;

        public KillTaskThread(JobClient jc, int thresholdMultiplier, float threshold, boolean onlyMapsProgress, int numIterations) {
            this.jc = jc;
            this.thresholdMultiplier = thresholdMultiplier;
            this.threshold = threshold;
            this.onlyMapsProgress = onlyMapsProgress;
            this.numIterations = numIterations;
            this.setDaemon(true);
        }

        public void setRunningJob(RunningJob rJob) {
            this.rJob = rJob;
        }

        public void kill() {
            this.killed = true;
        }

        @Override
        public void run() {
            this.killBasedOnProgress(true);
            if (!this.onlyMapsProgress) {
                this.killBasedOnProgress(false);
            }
        }

        private void killBasedOnProgress(boolean considerMaps) {
            boolean fail = false;
            if (considerMaps) {
                LOG.info((Object)"Will kill tasks based on Maps' progress");
            } else {
                LOG.info((Object)"Will kill tasks based on Reduces' progress");
            }
            LOG.info((Object)("Initial progress threshold: " + this.threshold + ". Threshold Multiplier: " + this.thresholdMultiplier + ". Number of iterations: " + this.numIterations));
            float thresholdVal = this.threshold;
            int numIterationsDone = 0;
            while (!this.killed) {
                try {
                    if (this.jc.getJob(this.rJob.getID()).isComplete() || numIterationsDone == this.numIterations) break;
                    float progress = considerMaps ? this.jc.getJob(this.rJob.getID()).mapProgress() : this.jc.getJob(this.rJob.getID()).reduceProgress();
                    if (progress >= thresholdVal) {
                        TaskReport[] reduceReports;
                        TaskReport[] mapReports;
                        if (++numIterationsDone > 0 && numIterationsDone % 2 == 0) {
                            fail = true;
                        }
                        ClusterStatus c = this.jc.getClusterStatus();
                        LOG.info((Object)(new Date() + " Killing a few tasks"));
                        ArrayList runningTasks = new ArrayList();
                        for (TaskReport mapReport : mapReports = this.jc.getMapTaskReports(this.rJob.getID())) {
                            if (mapReport.getCurrentStatus() != TIPStatus.RUNNING) continue;
                            runningTasks.addAll(mapReport.getRunningTaskAttempts());
                        }
                        if (runningTasks.size() > c.getTaskTrackers() / 2) {
                            int count = 0;
                            for (TaskAttemptID t : runningTasks) {
                                LOG.info((Object)(new Date() + " Killed task : " + t));
                                this.rJob.killTask(t, fail);
                                if (count++ <= runningTasks.size() / 2) continue;
                                break;
                            }
                        }
                        runningTasks.clear();
                        for (TaskReport reduceReport : reduceReports = this.jc.getReduceTaskReports(this.rJob.getID())) {
                            if (reduceReport.getCurrentStatus() != TIPStatus.RUNNING) continue;
                            runningTasks.addAll(reduceReport.getRunningTaskAttempts());
                        }
                        if (runningTasks.size() > c.getTaskTrackers() / 2) {
                            int count = 0;
                            for (TaskAttemptID t : runningTasks) {
                                LOG.info((Object)(new Date() + " Killed task : " + t));
                                this.rJob.killTask(t, fail);
                                if (count++ <= runningTasks.size() / 2) continue;
                                break;
                            }
                        }
                        thresholdVal *= (float)this.thresholdMultiplier;
                    }
                    Thread.sleep(5000L);
                }
                catch (InterruptedException ie) {
                    this.killed = true;
                }
                catch (Exception e) {
                    LOG.fatal((Object)StringUtils.stringifyException((Throwable)e));
                }
            }
        }
    }

    private class KillTrackerThread
    extends Thread {
        private volatile boolean killed = false;
        private JobClient jc;
        private RunningJob rJob;
        private final int thresholdMultiplier;
        private float threshold = 0.2f;
        private boolean onlyMapsProgress;
        private int numIterations;
        private final String slavesFile = ReliabilityTest.access$200(ReliabilityTest.this) + "/_reliability_test_slaves_file_";
        final String shellCommand = ReliabilityTest.access$300(ReliabilityTest.this, "bin/slaves.sh");
        private final String STOP_COMMAND = "ps uwwx | grep java | grep org.apache.hadoop.mapred.TaskTracker | grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP";
        private final String RESUME_COMMAND = "ps uwwx | grep java | grep org.apache.hadoop.mapred.TaskTracker | grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT";

        public KillTrackerThread(JobClient jc, int threshaldMultiplier, float threshold, boolean onlyMapsProgress, int numIterations) {
            this.jc = jc;
            this.thresholdMultiplier = threshaldMultiplier;
            this.threshold = threshold;
            this.onlyMapsProgress = onlyMapsProgress;
            this.numIterations = numIterations;
            this.setDaemon(true);
        }

        public void setRunningJob(RunningJob rJob) {
            this.rJob = rJob;
        }

        public void kill() {
            this.killed = true;
        }

        @Override
        public void run() {
            this.stopStartTrackers(true);
            if (!this.onlyMapsProgress) {
                this.stopStartTrackers(false);
            }
        }

        private void stopStartTrackers(boolean considerMaps) {
            if (considerMaps) {
                LOG.info((Object)"Will STOP/RESUME tasktrackers based on Maps' progress");
            } else {
                LOG.info((Object)"Will STOP/RESUME tasktrackers based on Reduces' progress");
            }
            LOG.info((Object)("Initial progress threshold: " + this.threshold + ". Threshold Multiplier: " + this.thresholdMultiplier + ". Number of iterations: " + this.numIterations));
            float thresholdVal = this.threshold;
            int numIterationsDone = 0;
            while (!this.killed) {
                try {
                    if (this.jc.getJob(this.rJob.getID()).isComplete() || numIterationsDone == this.numIterations) break;
                    float progress = considerMaps ? this.jc.getJob(this.rJob.getID()).mapProgress() : this.jc.getJob(this.rJob.getID()).reduceProgress();
                    if (progress >= thresholdVal) {
                        ++numIterationsDone;
                        ClusterStatus c = this.jc.getClusterStatus(true);
                        this.stopTaskTrackers(c);
                        Thread.sleep((int)Math.ceil(1.5 * (double)c.getTTExpiryInterval()));
                        this.startTaskTrackers();
                        thresholdVal *= (float)this.thresholdMultiplier;
                    }
                    Thread.sleep(5000L);
                }
                catch (InterruptedException ie) {
                    this.killed = true;
                    return;
                }
                catch (Exception e) {
                    LOG.fatal((Object)StringUtils.stringifyException((Throwable)e));
                }
            }
        }

        private void stopTaskTrackers(ClusterStatus c) throws Exception {
            Collection trackerNames = c.getActiveTrackerNames();
            ArrayList trackerNamesList = new ArrayList(trackerNames);
            Collections.shuffle(trackerNamesList);
            int count = 0;
            FileOutputStream fos = new FileOutputStream(new File(this.slavesFile));
            LOG.info((Object)(new Date() + " Stopping a few trackers"));
            for (String tracker : trackerNamesList) {
                String host = this.convertTrackerNameToHostName(tracker);
                LOG.info((Object)(new Date() + " Marking tracker on host: " + host));
                fos.write((host + "\n").getBytes());
                if (count++ < trackerNamesList.size() / 2) continue;
                break;
            }
            fos.close();
            this.runOperationOnTT("suspend");
        }

        private void startTaskTrackers() throws Exception {
            LOG.info((Object)(new Date() + " Resuming the stopped trackers"));
            this.runOperationOnTT("resume");
            new File(this.slavesFile).delete();
        }

        private void runOperationOnTT(String operation) throws IOException {
            HashMap<String, String> hMap = new HashMap<String, String>();
            hMap.put("HADOOP_SLAVES", this.slavesFile);
            StringTokenizer strToken = operation.equals("suspend") ? new StringTokenizer("ps uwwx | grep java | grep org.apache.hadoop.mapred.TaskTracker | grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP", " ") : new StringTokenizer("ps uwwx | grep java | grep org.apache.hadoop.mapred.TaskTracker | grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT", " ");
            String[] commandArgs = new String[strToken.countTokens() + 1];
            int i = 0;
            commandArgs[i++] = this.shellCommand;
            while (strToken.hasMoreTokens()) {
                commandArgs[i++] = strToken.nextToken();
            }
            String output = Shell.execCommand(hMap, (String[])commandArgs);
            if (output != null && !output.equals("")) {
                LOG.info((Object)output);
            }
        }

        private String convertTrackerNameToHostName(String trackerName) {
            int indexOfColon = trackerName.indexOf(":");
            String trackerHostName = indexOfColon == -1 ? trackerName : trackerName.substring(0, indexOfColon);
            return trackerHostName.substring("tracker_".length());
        }
    }
}

