/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Date;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import javax.security.auth.login.LoginException;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobQueueInfo;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Queue;
import org.apache.hadoop.mapred.QueueManager;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TestMiniMRWithDFSWithDistinctUsers;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.security.UserGroupInformation;

public class TestQueueManager
extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestQueueManager.class);
    String submitAcl = QueueManager.QueueACL.SUBMIT_JOB.getAclName();
    String adminAcl = QueueManager.QueueACL.ADMINISTER_JOBS.getAclName();
    MiniDFSCluster miniDFSCluster;
    MiniMRCluster miniMRCluster = null;

    UserGroupInformation createNecessaryUsers() throws IOException {
        String j = UserGroupInformation.getCurrentUser().getShortUserName();
        UserGroupInformation.createUserForTesting((String)j, (String[])new String[]{"myGroup"});
        UserGroupInformation ugi = UserGroupInformation.createUserForTesting((String)"Zork", (String[])new String[]{"ZorkGroup"});
        return ugi;
    }

    public void testDefaultQueueConfiguration() {
        JobConf conf = new JobConf();
        QueueManager qMgr = new QueueManager((Configuration)conf);
        TreeSet<String> expQueues = new TreeSet<String>();
        expQueues.add("default");
        this.verifyQueues(expQueues, qMgr.getQueues());
        TestQueueManager.assertFalse((boolean)conf.getBoolean("mapred.acls.enabled", true));
    }

    public void testMultipleQueues() {
        JobConf conf = new JobConf();
        conf.set("mapred.queue.names", "q1,q2,Q3");
        QueueManager qMgr = new QueueManager((Configuration)conf);
        TreeSet<String> expQueues = new TreeSet<String>();
        expQueues.add("q1");
        expQueues.add("q2");
        expQueues.add("Q3");
        this.verifyQueues(expQueues, qMgr.getQueues());
    }

    public void testSchedulerInfo() {
        JobConf conf = new JobConf();
        conf.set("mapred.queue.names", "qq1,qq2");
        QueueManager qMgr = new QueueManager((Configuration)conf);
        qMgr.setSchedulerInfo("qq1", (Object)"queueInfoForqq1");
        qMgr.setSchedulerInfo("qq2", (Object)"queueInfoForqq2");
        TestQueueManager.assertEquals((Object)qMgr.getSchedulerInfo("qq2"), (Object)"queueInfoForqq2");
        TestQueueManager.assertEquals((Object)qMgr.getSchedulerInfo("qq1"), (Object)"queueInfoForqq1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testAllEnabledACLForJobSubmission() throws IOException, InterruptedException {
        try {
            JobConf conf = this.setupConf(QueueManager.toFullPropertyName((String)"default", (String)this.submitAcl), "*");
            UserGroupInformation ugi = this.createNecessaryUsers();
            String[] groups = ugi.getGroupNames();
            this.verifyJobSubmissionToDefaultQueue(conf, true, ugi.getShortUserName() + "," + groups[groups.length - 1]);
        }
        finally {
            this.tearDownCluster();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testAllDisabledACLForJobSubmission() throws IOException, InterruptedException {
        try {
            this.createNecessaryUsers();
            JobConf conf = this.setupConf(QueueManager.toFullPropertyName((String)"default", (String)this.submitAcl), " ");
            String userName = "user1";
            String groupName = "group1";
            this.verifyJobSubmissionToDefaultQueue(conf, false, userName + "," + groupName);
            String user2 = "user2";
            String group2 = "group2";
            conf.set("mapreduce.cluster.administrators", user2 + " " + groupName);
            this.tearDownCluster();
            this.verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
            this.verifyJobSubmissionToDefaultQueue(conf, true, user2 + "," + group2);
            UserGroupInformation mrOwner = UserGroupInformation.getCurrentUser();
            userName = mrOwner.getShortUserName();
            String[] groups = mrOwner.getGroupNames();
            groupName = groups[groups.length - 1];
            this.verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
        }
        finally {
            this.tearDownCluster();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testUserDisabledACLForJobSubmission() throws IOException, InterruptedException {
        try {
            JobConf conf = this.setupConf(QueueManager.toFullPropertyName((String)"default", (String)this.submitAcl), "3698-non-existent-user");
            this.verifyJobSubmissionToDefaultQueue(conf, false, "user1,group1");
        }
        finally {
            this.tearDownCluster();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSubmissionToInvalidQueue() throws IOException, InterruptedException {
        try {
            JobConf conf = new JobConf();
            conf.set("mapred.queue.names", "default");
            this.setUpCluster(conf);
            String queueName = "q1";
            try {
                this.submitSleepJob(1, 1, 100L, 100L, true, null, queueName);
            }
            catch (IOException ioe) {
                TestQueueManager.assertTrue((boolean)ioe.getMessage().contains("Queue \"" + queueName + "\" does not exist"));
                return;
            }
            finally {
                this.tearDownCluster();
            }
            TestQueueManager.fail((String)"Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");
        }
        finally {
            this.tearDownCluster();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testUserEnabledACLForJobSubmission() throws IOException, LoginException, InterruptedException {
        try {
            String userName = "user1";
            JobConf conf = this.setupConf(QueueManager.toFullPropertyName((String)"default", (String)this.submitAcl), "3698-junk-user," + userName + " 3698-junk-group1,3698-junk-group2");
            this.verifyJobSubmissionToDefaultQueue(conf, true, userName + ",group1");
        }
        finally {
            this.tearDownCluster();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testStateRefresh() throws Exception {
        String queueConfigPath = System.getProperty("test.build.extraconf", "build/test/extraconf");
        File queueConfigFile = new File(queueConfigPath, "mapred-queue-acls.xml");
        try {
            Properties queueConfProps = new Properties();
            queueConfProps.put("mapred.queue.names", "default,qu1");
            queueConfProps.put("mapred.acls.enabled", "true");
            queueConfProps.put("mapred.queue.default.state", "RUNNING");
            queueConfProps.put("mapred.queue.qu1.state", "STOPPED");
            UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
            JobConf conf = new JobConf();
            this.setUpCluster(conf);
            QueueManager queueManager = this.miniMRCluster.getJobTrackerRunner().getJobTracker().getQueueManager();
            RunningJob job = this.submitSleepJob(1, 1, 100L, 100L, true, null, "default");
            TestQueueManager.assertTrue((boolean)job.isSuccessful());
            try {
                this.submitSleepJob(1, 1, 100L, 100L, true, null, "qu1");
                TestQueueManager.fail((String)"submit job in default queue should be failed ");
            }
            catch (Exception e) {
                TestQueueManager.assertTrue((boolean)e.getMessage().contains("Queue \"qu1\" is not running"));
            }
            JobQueueInfo queueInfo = queueManager.getJobQueueInfo("default");
            TestQueueManager.assertEquals((String)Queue.QueueState.RUNNING.getStateName(), (String)queueInfo.getQueueState());
            queueInfo = queueManager.getJobQueueInfo("qu1");
            TestQueueManager.assertEquals((String)Queue.QueueState.STOPPED.getStateName(), (String)queueInfo.getQueueState());
            queueConfProps.put("mapred.queue.default.state", "STOPPED");
            queueConfProps.put("mapred.queue.qu1.state", "RUNNING");
            UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
            queueManager.refreshQueues((Configuration)conf);
            try {
                this.submitSleepJob(1, 1, 100L, 100L, true, null, "qu1");
            }
            catch (Exception e) {
                TestQueueManager.fail((String)"submit job in qu1 queue should be sucessful ");
            }
            try {
                this.submitSleepJob(1, 1, 100L, 100L, true, null, "default");
                TestQueueManager.fail((String)"submit job in default queue should be failed ");
            }
            catch (Exception e) {
                TestQueueManager.assertTrue((boolean)e.getMessage().contains("Queue \"default\" is not running"));
            }
            queueInfo = queueManager.getJobQueueInfo("default");
            TestQueueManager.assertEquals((String)Queue.QueueState.STOPPED.getStateName(), (String)queueInfo.getQueueState());
            queueInfo = queueManager.getJobQueueInfo("qu1");
            TestQueueManager.assertEquals((String)Queue.QueueState.RUNNING.getStateName(), (String)queueInfo.getQueueState());
        }
        finally {
            if (queueConfigFile.exists()) {
                queueConfigFile.delete();
            }
            this.tearDownCluster();
        }
    }

    JobConf setupConf(String aclName, String aclValue) {
        JobConf conf = new JobConf();
        conf.setBoolean("mapred.acls.enabled", true);
        conf.set(aclName, aclValue);
        return conf;
    }

    void verifyQueues(Set<String> expectedQueues, Set<String> actualQueues) {
        TestQueueManager.assertEquals((int)expectedQueues.size(), (int)actualQueues.size());
        for (String queue : expectedQueues) {
            TestQueueManager.assertTrue((boolean)actualQueues.contains(queue));
        }
    }

    void verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed, String userInfo) throws IOException, InterruptedException {
        this.verifyJobSubmission(conf, shouldSucceed, userInfo, "default");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void verifyJobSubmission(JobConf conf, boolean shouldSucceed, String userInfo, String queue) throws IOException, InterruptedException {
        this.setUpCluster(conf);
        this.runAndVerifySubmission(conf, shouldSucceed, queue, userInfo);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runAndVerifySubmission(JobConf conf, boolean shouldSucceed, String queue, String userInfo) throws IOException, InterruptedException {
        try {
            RunningJob rjob = this.submitSleepJob(1, 1, 100L, 100L, true, userInfo, queue);
            if (shouldSucceed) {
                TestQueueManager.assertTrue((boolean)rjob.isSuccessful());
            } else {
                TestQueueManager.fail((String)"Job submission should have failed.");
            }
        }
        catch (IOException ioe) {
            if (shouldSucceed) {
                throw ioe;
            }
            LOG.info((Object)("exception while submitting job: " + ioe.getMessage()));
            TestQueueManager.assertTrue((boolean)ioe.getMessage().contains("cannot perform operation SUBMIT_JOB on queue " + queue));
            JobTracker jobtracker = this.miniMRCluster.getJobTrackerRunner().getJobTracker();
            Path sysDir = new Path(jobtracker.getSystemDir());
            FileSystem fs = sysDir.getFileSystem((Configuration)conf);
            int size = fs.listStatus(sysDir).length;
            while (size > 1) {
                System.out.println("Waiting for the job files in sys directory to be cleaned up");
                UtilsForTests.waitFor(100L);
                size = fs.listStatus(sysDir).length;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void verifyJobKill(UserGroupInformation ugi, JobConf conf, boolean shouldSucceed) throws IOException, InterruptedException {
        block12: {
            this.setUpCluster(conf);
            try {
                RunningJob rjob = this.submitSleepJob(1, 1, 1000L, 1000L, false);
                TestQueueManager.assertFalse((boolean)rjob.isComplete());
                while (rjob.mapProgress() == 0.0f) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException ie) {
                        // empty catch block
                        break;
                    }
                }
                conf.set("mapred.job.tracker", "localhost:" + this.miniMRCluster.getJobTrackerPort());
                final String jobId = rjob.getJobID();
                ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Object>(){

                    @Override
                    public Object run() throws Exception {
                        RunningJob runningJob = new JobClient(TestQueueManager.this.miniMRCluster.createJobConf()).getJob(jobId);
                        runningJob.killJob();
                        return null;
                    }
                });
                while (rjob.cleanupProgress() == 0.0f) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException ie) {
                        // empty catch block
                        break;
                    }
                }
                if (shouldSucceed) {
                    TestQueueManager.assertTrue((boolean)rjob.isComplete());
                    break block12;
                }
                TestQueueManager.fail((String)"Job kill should have failed.");
            }
            catch (IOException ioe) {
                if (shouldSucceed) {
                    throw ioe;
                }
                LOG.info((Object)("exception while submitting/killing job: " + ioe.getMessage()));
                TestQueueManager.assertTrue((boolean)ioe.getMessage().contains(" cannot perform operation KILL_JOB on "));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed, String otherUserInfo) throws IOException, InterruptedException {
        this.setUpCluster(conf);
        String userInfo = otherUserInfo;
        RunningJob rjob = this.submitSleepJob(1, 1, 1000L, 1000L, false, userInfo);
        TestQueueManager.assertFalse((boolean)rjob.isComplete());
        try {
            conf.set("mapred.job.tracker", "localhost:" + this.miniMRCluster.getJobTrackerPort());
            JobClient client = new JobClient(this.miniMRCluster.createJobConf());
            client.getJob(rjob.getID()).killJob();
            if (!shouldSucceed) {
                TestQueueManager.fail((String)"should fail kill operation");
            }
        }
        catch (IOException ioe) {
            if (shouldSucceed) {
                throw ioe;
            }
            LOG.info((Object)("exception while killing job: " + ioe.getMessage()));
            TestQueueManager.assertTrue((boolean)ioe.getMessage().contains("cannot perform operation KILL_JOB on queue default"));
        }
        while (!rjob.isComplete()) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException ie) {
                break;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI, JobConf conf, final boolean shouldSucceed) throws IOException, InterruptedException {
        this.setUpCluster(conf);
        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
        String[] groups = ugi.getGroupNames();
        String userInfo = ugi.getShortUserName() + "," + groups[groups.length - 1];
        final RunningJob rjob = this.submitSleepJob(1, 1, 1000L, 1000L, false, userInfo);
        TestQueueManager.assertFalse((boolean)rjob.isComplete());
        conf.set("mapred.job.tracker", "localhost:" + this.miniMRCluster.getJobTrackerPort());
        otherUGI.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Object>(){

            @Override
            public Object run() throws Exception {
                try {
                    JobClient client = new JobClient(TestQueueManager.this.miniMRCluster.createJobConf());
                    client.getJob(rjob.getID()).setJobPriority("VERY_LOW");
                    if (!shouldSucceed) {
                        Assert.fail((String)"changing priority should fail.");
                    }
                }
                catch (IOException ioe) {
                    LOG.info((Object)("exception while changing priority of job: " + ioe.getMessage()));
                    Assert.assertTrue((boolean)ioe.getMessage().contains(" cannot perform operation SET_JOB_PRIORITY on "));
                }
                return null;
            }
        });
        while (!rjob.isComplete()) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException ie) {
                break;
            }
        }
    }

    void setUpCluster(JobConf conf) throws IOException {
        if (this.miniMRCluster == null) {
            this.miniDFSCluster = new MiniDFSCluster((Configuration)conf, 1, true, null);
            FileSystem fileSys = this.miniDFSCluster.getFileSystem();
            TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys, "/user");
            TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys, conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging"));
            String namenode = fileSys.getUri().toString();
            this.miniMRCluster = new MiniMRCluster(1, namenode, 3, null, null, conf);
        }
    }

    void tearDownCluster() throws IOException {
        if (this.miniMRCluster != null) {
            long mrTeardownStart = new Date().getTime();
            if (this.miniMRCluster != null) {
                this.miniMRCluster.shutdown();
            }
            long mrTeardownEnd = new Date().getTime();
            if (this.miniDFSCluster != null) {
                this.miniDFSCluster.shutdown();
            }
            long dfsTeardownEnd = new Date().getTime();
            this.miniMRCluster = null;
            this.miniDFSCluster = null;
            System.err.println("An MR teardown took " + (mrTeardownEnd - mrTeardownStart) + " milliseconds.  A DFS teardown took " + (dfsTeardownEnd - mrTeardownEnd) + " milliseconds.");
        }
    }

    RunningJob submitSleepJob(int numMappers, int numReducers, long mapSleepTime, long reduceSleepTime, boolean shouldComplete) throws IOException, InterruptedException {
        return this.submitSleepJob(numMappers, numReducers, mapSleepTime, reduceSleepTime, shouldComplete, null);
    }

    RunningJob submitSleepJob(int numMappers, int numReducers, long mapSleepTime, long reduceSleepTime, boolean shouldComplete, String userInfo) throws IOException, InterruptedException {
        return this.submitSleepJob(numMappers, numReducers, mapSleepTime, reduceSleepTime, shouldComplete, userInfo, null);
    }

    RunningJob submitSleepJob(int numMappers, int numReducers, long mapSleepTime, long reduceSleepTime, final boolean shouldComplete, String userInfo, String queueName) throws IOException, InterruptedException {
        UserGroupInformation ugi;
        JobConf clientConf = new JobConf();
        clientConf.set("mapred.job.tracker", "localhost:" + this.miniMRCluster.getJobTrackerPort());
        SleepJob job = new SleepJob();
        job.setConf((Configuration)clientConf);
        clientConf = job.setupJobConf(numMappers, numReducers, mapSleepTime, (int)mapSleepTime / 100, reduceSleepTime, (int)reduceSleepTime / 100);
        if (queueName != null) {
            clientConf.setQueueName(queueName);
        }
        final JobConf jc = new JobConf((Configuration)clientConf);
        if (userInfo != null) {
            String[] splits = userInfo.split(",");
            String[] groups = new String[splits.length - 1];
            System.arraycopy(splits, 1, groups, 0, splits.length - 1);
            ugi = UserGroupInformation.createUserForTesting((String)splits[0], (String[])groups);
        } else {
            ugi = UserGroupInformation.getCurrentUser();
        }
        RunningJob rJob = (RunningJob)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RunningJob>(){

            @Override
            public RunningJob run() throws IOException {
                if (shouldComplete) {
                    return JobClient.runJob((JobConf)jc);
                }
                return new JobClient(jc).submitJob(jc);
            }
        });
        return rJob;
    }
}

