/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2;

import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.TestSpeculativeExecution;
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestSpeculativeExecution {
    private static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class);
    protected static MiniMRYarnCluster mrCluster;
    private static Configuration initialConf;
    private static FileSystem localFs;
    private static Path TEST_ROOT_DIR;
    static Path APP_JAR;
    private static Path TEST_OUT_DIR;

    @BeforeClass
    public static void setup() throws IOException {
        if (!new File(MiniMRYarnCluster.APPJAR).exists()) {
            LOG.info((Object)("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."));
            return;
        }
        if (mrCluster == null) {
            mrCluster = new MiniMRYarnCluster(TestSpeculativeExecution.class.getName(), 4);
            Configuration conf = new Configuration();
            mrCluster.init(conf);
            mrCluster.start();
        }
        localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
        localFs.setPermission(APP_JAR, new FsPermission("700"));
    }

    @AfterClass
    public static void tearDown() {
        if (mrCluster != null) {
            mrCluster.stop();
            mrCluster = null;
        }
    }

    @Test
    public void testSpeculativeExecution() throws Exception {
        if (!new File(MiniMRYarnCluster.APPJAR).exists()) {
            LOG.info((Object)("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."));
            return;
        }
        Job job = this.runSpecTest(false, false);
        boolean succeeded = job.waitForCompletion(true);
        Assert.assertTrue((boolean)succeeded);
        Assert.assertEquals((Object)JobStatus.State.SUCCEEDED, (Object)job.getJobState());
        Counters counters = job.getCounters();
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_MAPS).getValue());
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
        Assert.assertEquals((long)0L, (long)counters.findCounter((Enum)JobCounter.NUM_FAILED_MAPS).getValue());
        job = this.runSpecTest(true, false);
        succeeded = job.waitForCompletion(true);
        Assert.assertTrue((boolean)succeeded);
        Assert.assertEquals((Object)JobStatus.State.SUCCEEDED, (Object)job.getJobState());
        counters = job.getCounters();
        Assert.assertEquals((long)3L, (long)counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_MAPS).getValue());
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
        Assert.assertEquals((long)0L, (long)counters.findCounter((Enum)JobCounter.NUM_FAILED_MAPS).getValue());
        Assert.assertEquals((long)1L, (long)counters.findCounter((Enum)JobCounter.NUM_KILLED_MAPS).getValue());
        job = this.runSpecTest(false, true);
        succeeded = job.waitForCompletion(true);
        Assert.assertTrue((boolean)succeeded);
        Assert.assertEquals((Object)JobStatus.State.SUCCEEDED, (Object)job.getJobState());
        counters = job.getCounters();
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_MAPS).getValue());
        Assert.assertEquals((long)3L, (long)counters.findCounter((Enum)JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
    }

    private Path createTempFile(String filename, String contents) throws IOException {
        Path path = new Path(TEST_ROOT_DIR, filename);
        FSDataOutputStream os = localFs.create(path);
        os.writeBytes(contents);
        os.close();
        localFs.setPermission(path, new FsPermission("700"));
        return path;
    }

    private Job runSpecTest(boolean mapspec, boolean redspec) throws IOException, ClassNotFoundException, InterruptedException {
        Path first = this.createTempFile("specexec_map_input1", "a\nz");
        Path secnd = this.createTempFile("specexec_map_input2", "a\nz");
        Configuration conf = mrCluster.getConfig();
        conf.setBoolean("mapreduce.map.speculative", mapspec);
        conf.setBoolean("mapreduce.reduce.speculative", redspec);
        conf.setClass("yarn.app.mapreduce.am.job.task.estimator.class", TestSpecEstimator.class, TaskRuntimeEstimator.class);
        Job job = Job.getInstance((Configuration)conf);
        job.setJarByClass(TestSpeculativeExecution.class);
        job.setMapperClass(SpeculativeMapper.class);
        job.setReducerClass(SpeculativeReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setNumReduceTasks(2);
        FileInputFormat.setInputPaths((Job)job, (Path[])new Path[]{first});
        FileInputFormat.addInputPath((Job)job, (Path)secnd);
        FileOutputFormat.setOutputPath((Job)job, (Path)TEST_OUT_DIR);
        try {
            localFs.delete(TEST_OUT_DIR, true);
        }
        catch (IOException e) {
            // empty catch block
        }
        job.addFileToClassPath(APP_JAR);
        job.createSymlink();
        job.setMaxMapAttempts(2);
        job.submit();
        return job;
    }

    static {
        initialConf = new Configuration();
        try {
            localFs = FileSystem.getLocal((Configuration)initialConf);
        }
        catch (IOException io) {
            throw new RuntimeException("problem getting local fs", io);
        }
        TEST_ROOT_DIR = new Path("target", TestSpeculativeExecution.class.getName() + "-tmpDir").makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
        APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
        TEST_OUT_DIR = new Path(TEST_ROOT_DIR, "test.out.dir");
    }
}

