/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.JobTrackerInstrumentation;
import org.apache.hadoop.mapred.JobTrackerNotYetInitializedException;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.QueueManager;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestRecoveryManager {
    private static final Log LOG = LogFactory.getLog(TestRecoveryManager.class);
    private static final Path TEST_DIR = new Path(System.getProperty("test.build.data", "/tmp"), "test-recovery-manager");
    private FileSystem fs;
    private MiniDFSCluster dfs;
    private MiniMRCluster mr;

    @Before
    public void setUp() throws IOException {
        this.fs = FileSystem.get((Configuration)new Configuration());
        this.fs.delete(TEST_DIR, true);
    }

    private void startCluster() throws IOException {
        this.startCluster(new JobConf());
    }

    private void startCluster(JobConf conf) throws IOException {
        this.mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @After
    public void tearDown() {
        try {
            ClusterStatus status;
            if (this.mr != null && (status = this.mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false)).getJobTrackerState() == JobTracker.State.RUNNING) {
                this.mr.shutdown();
            }
        }
        finally {
            this.mr = null;
            try {
                if (this.dfs != null) {
                    this.dfs.shutdown();
                    this.dfs = null;
                }
            }
            finally {
                this.dfs = null;
            }
        }
    }

    @Test(timeout=120000L)
    public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
        LOG.info((Object)"Testing jobtracker restart with faulty job");
        this.startCluster();
        String signalFile = new Path(TEST_DIR, "signal").toString();
        JobConf job1 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob1 = new JobClient(job1).submitJob(job1);
        LOG.info((Object)("Submitted job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        while (rJob2.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob2.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        String sysDir = this.mr.getJobTrackerRunner().getJobTracker().getSystemDir();
        this.mr.stopJobTracker();
        Path jobFile = new Path(sysDir, rJob1.getID().toString() + "/" + "job-info");
        LOG.info((Object)("Deleting job token file : " + jobFile.toString()));
        Assert.assertTrue((boolean)this.fs.delete(jobFile, false));
        FSDataOutputStream out = this.fs.create(jobFile);
        out.write(1);
        out.close();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        ClusterStatus status = jobtracker.getClusterStatus(false);
        Assert.assertEquals((String)"JobTracker crashed!", (Object)JobTracker.State.RUNNING, (Object)status.getJobTrackerState());
        JobInProgress jip = jobtracker.getJob(rJob2.getID());
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob2.getID() + " to be successful"));
            this.fs.create(new Path(TEST_DIR, "signal"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue((String)"Job should be successful", (boolean)rJob2.isSuccessful());
    }

    @Test(timeout=120000L)
    public void testJobResubmission() throws Exception {
        LOG.info((Object)"Testing Job Resubmission");
        this.startCluster();
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, signalFile);
        JobClient jc1 = new JobClient(job1);
        RunningJob rJob1 = jc1.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = this.mr.createJobConf();
        String signalFile1 = new Path(TEST_DIR, "signal1").toString();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, "test-recovery-manager", signalFile1, signalFile1);
        job2.setBoolean("mapreduce.job.restart.recover", false);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        JobInProgress jip2 = jobtracker.getJob(rJob2.getID());
        while (!jip2.inited()) {
            LOG.info((Object)("Waiting for job " + jip2.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc1);
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals((String)"Resubmission failed ", (long)1L, (long)jobtracker.getAllJobs().length);
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be successful"));
            this.fs.create(new Path(TEST_DIR, "signal"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue((String)"Task should be successful", (boolean)rJob1.isSuccessful());
        Assert.assertTrue((String)"Job should be cleaned up", (!this.fs.exists(new Path(job1.get("mapreduce.job.dir"))) ? 1 : 0) != 0);
        Assert.assertTrue((String)"Job should be cleaned up", (!this.fs.exists(new Path(job2.get("mapreduce.job.dir"))) ? 1 : 0) != 0);
    }

    @Test
    public void testJobTrackerRestartBeforeJobFinalization() throws Exception {
        LOG.info((Object)"Testing Job Resubmission");
        JobConf conf = new JobConf();
        conf.setBoolean("mapred.jobtracker.restart.recover", true);
        conf.setClass("mapred.jobtracker.instrumentation", TestJobTrackerInstrumentation.class, JobTrackerInstrumentation.class);
        this.startCluster(conf);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        SleepJob job = new SleepJob();
        job.setConf((Configuration)this.mr.createJobConf());
        JobConf job1 = job.setupJobConf(1, 0, 1L, 1, 1L, 1);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        TestJobTrackerInstrumentation.finalizeCall.await();
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc);
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals((String)"Resubmission failed ", (long)1L, (long)jobtracker.getAllJobs().length);
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be successful"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue((String)"Task should be successful", (boolean)rJob1.isSuccessful());
    }

    @Test(timeout=120000L)
    public void testJobTrackerRestartWithBadJobs() throws Exception {
        LOG.info((Object)"Testing recovery-manager");
        this.startCluster();
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = this.mr.createJobConf();
        job1.setJobPriority(JobPriority.HIGH);
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0, "test-recovery-manager", signalFile, signalFile);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = this.mr.createJobConf();
        String signalFile1 = new Path(TEST_DIR, "signal1").toString();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0, "test-recovery-manager", signalFile1, signalFile1);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        JobInProgress jip = jobtracker.getJob(rJob2.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        final JobConf job3 = this.mr.createJobConf();
        UserGroupInformation ugi3 = UserGroupInformation.createUserForTesting((String)"abc", (String[])new String[]{"users"});
        UtilsForTests.configureWaitingJobConf(job3, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob3 = (RunningJob)ugi3.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RunningJob>(){

            @Override
            public RunningJob run() throws IOException {
                return new JobClient(job3).submitJob(job3);
            }
        });
        LOG.info((Object)("Submitted job " + rJob3.getID() + " with different user"));
        jip = jobtracker.getJob(rJob3.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        this.mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
        this.mr.getJobTrackerConf().setBoolean("mapred.acls.enabled", true);
        UserGroupInformation ugi = UserGroupInformation.getLoginUser();
        this.mr.getJobTrackerConf().set(QueueManager.toFullPropertyName((String)"default", (String)QueueManager.QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc);
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals((String)"Recovery manager failed to tolerate job failures", (long)1L, (long)jobtracker.getAllJobs().length);
        JobStatus status = jobtracker.getJobStatus(rJob1.getID());
        Assert.assertNull((String)"Faulty job should not be resubmitted", (Object)status);
        jip = jobtracker.getJob(rJob2.getID());
        Assert.assertFalse((String)"Job should be running", (boolean)jip.isComplete());
        status = jobtracker.getJobStatus(rJob3.getID());
        Assert.assertNull((String)"Job should be missing because of ACL changed", (Object)status);
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob2.getID() + " to be successful"));
            this.fs.create(new Path(TEST_DIR, "signal1"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue((String)"Job should be successful", (boolean)rJob2.isSuccessful());
    }

    @Test(timeout=120000L)
    public void testRestartCount() throws Exception {
        LOG.info((Object)"Testing Job Restart Count");
        this.startCluster();
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = this.mr.createJobConf();
        job1.setJobPriority(JobPriority.HIGH);
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 30, 0, "test-restart", signalFile, signalFile);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        for (int i = 1; i <= 2; ++i) {
            LOG.info((Object)("Stopping jobtracker for " + i + " time"));
            this.mr.stopJobTracker();
            LOG.info((Object)("Starting jobtracker for " + i + " time"));
            this.mr.startJobTracker();
            UtilsForTests.waitForJobTracker(jc);
            jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
            Assert.assertEquals((String)"Recovery manager failed to recover restart count", (long)0L, (long)jip.getNumRestarts());
        }
        rJob1.killJob();
        JobConf job2 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager", signalFile, signalFile);
        RunningJob rJob2 = jc.submitJob(job2);
        LOG.info((Object)("Submitted first job after restart" + rJob2.getID()));
        jip = jobtracker.getJob(rJob2.getID());
        Assert.assertEquals((String)"Restart count for new job is incorrect", (long)0L, (long)jip.getNumRestarts());
        LOG.info((Object)"Stopping jobtracker for testing the fs errors");
        this.mr.stopJobTracker();
        Path rFile = jobtracker.recoveryManager.getRestartCountFile();
        this.fs.delete(rFile, false);
        FSDataOutputStream out = this.fs.create(rFile);
        out.writeBoolean(true);
        out.close();
        LOG.info((Object)"Starting jobtracker with fs errors");
        this.mr.startJobTracker();
        MiniMRCluster.JobTrackerRunner runner = this.mr.getJobTrackerRunner();
        Assert.assertFalse((String)"JobTracker is still alive", (boolean)runner.isActive());
    }

    @Test(timeout=120000L)
    public void testJobTrackerInfoCreation() throws Exception {
        LOG.info((Object)"Testing jobtracker.info file");
        this.dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
        String namenode = this.dfs.getFileSystem().getUri().getHost() + ":" + this.dfs.getFileSystem().getUri().getPort();
        this.dfs.shutdownDataNodes();
        JobConf conf = new JobConf();
        FileSystem.setDefaultUri((Configuration)conf, (String)namenode);
        conf.set("mapred.job.tracker", "localhost:0");
        conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
        JobTracker jobtracker = new JobTracker(conf);
        jobtracker.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_ENTER);
        jobtracker.initializeFilesystem();
        jobtracker.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_LEAVE);
        jobtracker.initialize();
        boolean failed = false;
        try {
            jobtracker.recoveryManager.updateRestartCount();
        }
        catch (IOException ioe) {
            failed = true;
        }
        Assert.assertTrue((String)"JobTracker created info files without datanodes!!!", (boolean)failed);
        Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
        Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
        FileSystem fs = this.dfs.getFileSystem();
        Assert.assertFalse((String)"Info file exists after update failure", (boolean)fs.exists(restartFile));
        Assert.assertFalse((String)"Temporary restart-file exists after update failure", (boolean)fs.exists(restartFile));
        this.dfs.startDataNodes((Configuration)conf, 1, true, null, null, null, null);
        this.dfs.waitActive();
        failed = false;
        try {
            jobtracker.recoveryManager.updateRestartCount();
        }
        catch (IOException ioe) {
            failed = true;
        }
        Assert.assertFalse((String)"JobTracker failed to create info files with datanodes!", (boolean)failed);
    }

    static void mkdirWithPerms(FileSystem fs, String dir, short mode) throws IOException {
        Path p = new Path(dir);
        fs.mkdirs(p);
        fs.setPermission(p, new FsPermission(mode));
    }

    @Test(timeout=120000L)
    public void testJobResubmissionAsDifferentUser() throws Exception {
        LOG.info((Object)"Testing Job Resubmission as a different user to the jobtracker");
        Path HDFS_TEST_DIR = new Path("/tmp");
        JobConf conf = new JobConf();
        this.dfs = new MiniDFSCluster((Configuration)conf, 1, true, null);
        this.fs = this.dfs.getFileSystem();
        conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
        conf.set("mapred.system.dir", "/mapred");
        String mapredSysDir = conf.get("mapred.system.dir");
        TestRecoveryManager.mkdirWithPerms(this.fs, mapredSysDir, (short)448);
        this.fs.setOwner(new Path(mapredSysDir), UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
        TestRecoveryManager.mkdirWithPerms(this.fs, "/user", (short)511);
        TestRecoveryManager.mkdirWithPerms(this.fs, "/mapred", (short)511);
        TestRecoveryManager.mkdirWithPerms(this.fs, "/tmp", (short)511);
        this.mr = new MiniMRCluster(1, this.dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
        String signalFile = new Path(HDFS_TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        final JobConf job1 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"), new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, signalFile);
        UserGroupInformation ugi = UserGroupInformation.createUserForTesting((String)"bob", (String[])new String[]{"users"});
        job1.setUser(ugi.getUserName());
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = (RunningJob)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RunningJob>(){

            @Override
            public RunningJob run() throws IOException {
                JobClient jc = new JobClient(job1);
                return jc.submitJob(job1);
            }
        });
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        this.mr.getJobTrackerConf().setBoolean("mapreduce.jobtracker.init.for.tests", false);
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker(false);
        while (!this.mr.getJobTrackerRunner().isUp()) {
            Thread.sleep(100L);
        }
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertNotNull((Object)jobtracker);
        boolean gotJTNYIException = false;
        try {
            jobtracker.getJobStatus(rJob1.getID());
        }
        catch (JobTrackerNotYetInitializedException jtnyie) {
            LOG.info((Object)"Caught JobTrackerNotYetInitializedException", (Throwable)jtnyie);
            gotJTNYIException = true;
        }
        Assert.assertTrue((boolean)gotJTNYIException);
        jobtracker.setInitDone(true);
        UtilsForTests.waitForJobTracker(jc);
        Assert.assertEquals((String)"Resubmission failed ", (long)1L, (long)jobtracker.getAllJobs().length);
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        this.fs.create(new Path(HDFS_TEST_DIR, "signal"));
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be successful"));
            UtilsForTests.waitFor(100L);
        }
        rJob1 = jc.getJob(rJob1.getID());
        Assert.assertTrue((String)"Task should be successful", (boolean)rJob1.isSuccessful());
    }

    @Test(timeout=120000L)
    public void testJobInitError() throws Exception {
        LOG.info((Object)"Testing error during Job submission");
        Path HDFS_TEST_DIR = new Path("/tmp");
        JobConf conf = new JobConf();
        this.dfs = new MiniDFSCluster((Configuration)conf, 1, true, null);
        this.fs = this.dfs.getFileSystem();
        conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
        conf.set("mapred.system.dir", "/mapred");
        String mapredSysDir = conf.get("mapred.system.dir");
        TestRecoveryManager.mkdirWithPerms(this.fs, mapredSysDir, (short)448);
        this.fs.setOwner(new Path(mapredSysDir), UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
        TestRecoveryManager.mkdirWithPerms(this.fs, "/user", (short)511);
        TestRecoveryManager.mkdirWithPerms(this.fs, "/mapred", (short)511);
        TestRecoveryManager.mkdirWithPerms(this.fs, "/tmp", (short)511);
        this.mr = new MiniMRCluster(1, this.dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
        String signalFile = new Path(HDFS_TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        final JobConf job1 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"), new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, signalFile);
        UserGroupInformation ugi = UserGroupInformation.createUserForTesting((String)"bob", (String[])new String[]{"users"});
        job1.setUser(ugi.getUserName());
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = (RunningJob)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RunningJob>(){

            @Override
            public RunningJob run() throws IOException {
                job1.setBoolean("mapreduce.job.init.throw.exception", true);
                JobClient jc = new JobClient(job1);
                return jc.submitJob(job1);
            }
        });
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        LOG.info((Object)"Starting jobtracker");
        this.mr.getJobTrackerConf().setBoolean("mapreduce.jt.job.init.throw.exception.override", true);
        this.mr.startJobTracker(false);
        while (!this.mr.getJobTrackerRunner().isUp()) {
            Thread.sleep(100L);
        }
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertNotNull((Object)jobtracker);
        UtilsForTests.waitForJobTracker(jc);
        Assert.assertEquals((String)"Resubmission failed ", (long)1L, (long)jobtracker.getAllJobs().length);
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        this.fs.create(new Path(HDFS_TEST_DIR, "signal"));
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be successful"));
            UtilsForTests.waitFor(100L);
        }
        rJob1 = jc.getJob(rJob1.getID());
        Assert.assertTrue((String)"Task should be successful", (boolean)rJob1.isSuccessful());
    }

    public static class TestJobTrackerInstrumentation
    extends JobTrackerInstrumentation {
        static CountDownLatch finalizeCall = new CountDownLatch(1);

        public TestJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
            super(jt, conf);
        }

        public void finalizeJob(JobConf conf, JobID id) {
            if (finalizeCall.getCount() == 0L) {
                return;
            }
            finalizeCall.countDown();
            throw new IllegalStateException("Controlled error finalizing job");
        }
    }
}

