/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestReporter {
    private static final Path rootTempDir = new Path(System.getProperty("test.build.data", "/tmp"));
    private static final Path testRootTempDir = new Path(rootTempDir, "TestReporter");
    private static FileSystem fs = null;
    private static final String INPUT = "Hi\nHi\nHi\nHi\n";
    private static final int INPUT_LINES = "Hi\nHi\nHi\nHi\n".split("\n").length;

    @BeforeClass
    public static void setup() throws Exception {
        fs = FileSystem.getLocal((Configuration)new Configuration());
        fs.delete(testRootTempDir, true);
        fs.mkdirs(testRootTempDir);
    }

    @AfterClass
    public static void cleanup() throws Exception {
        fs.delete(testRootTempDir, true);
    }

    @Test
    public void testReporterProgressForMapOnlyJob() throws IOException {
        Path test = new Path(testRootTempDir, "testReporterProgressForMapOnlyJob");
        JobConf conf = new JobConf();
        conf.setMapperClass(ProgressTesterMapper.class);
        conf.setMapOutputKeyClass(Text.class);
        conf.setMaxMapAttempts(1);
        conf.setMaxReduceAttempts(0);
        RunningJob job = UtilsForTests.runJob(conf, new Path(test, "in"), new Path(test, "out"), 1, 0, INPUT);
        job.waitForCompletion();
        Assert.assertTrue((String)"Job failed", (boolean)job.isSuccessful());
    }

    @Test
    public void testReporterProgressForMRJob() throws IOException {
        Path test = new Path(testRootTempDir, "testReporterProgressForMRJob");
        JobConf conf = new JobConf();
        conf.setMapperClass(ProgressTesterMapper.class);
        conf.setReducerClass(ProgressTestingReducer.class);
        conf.setMapOutputKeyClass(Text.class);
        conf.setMaxMapAttempts(1);
        conf.setMaxReduceAttempts(1);
        RunningJob job = UtilsForTests.runJob(conf, new Path(test, "in"), new Path(test, "out"), 1, 1, INPUT);
        job.waitForCompletion();
        Assert.assertTrue((String)"Job failed", (boolean)job.isSuccessful());
    }

    @Test
    public void testStatusLimit() throws IOException, InterruptedException, ClassNotFoundException {
        Path test = new Path(testRootTempDir, "testStatusLimit");
        Configuration conf = new Configuration();
        Path inDir = new Path(test, "in");
        Path outDir = new Path(test, "out");
        FileSystem fs = FileSystem.get((Configuration)conf);
        if (fs.exists(inDir)) {
            fs.delete(inDir, true);
        }
        fs.mkdirs(inDir);
        FSDataOutputStream file = fs.create(new Path(inDir, "part-0"));
        file.writeBytes("testStatusLimit");
        file.close();
        if (fs.exists(outDir)) {
            fs.delete(outDir, true);
        }
        Job job = Job.getInstance((Configuration)conf, (String)"testStatusLimit");
        job.setMapperClass(StatusLimitMapper.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath((Job)job, (Path)inDir);
        FileOutputFormat.setOutputPath((Job)job, (Path)outDir);
        job.waitForCompletion(true);
        Assert.assertTrue((String)"Job failed", (boolean)job.isSuccessful());
    }

    static class ProgressTestingReducer
    extends MapReduceBase
    implements Reducer<Text, Text, Text, Text> {
        private int recordCount = 0;
        private Reporter reporter = null;
        private final float REDUCE_PROGRESS_RANGE = 0.33333334f;
        private final float SHUFFLE_PROGRESS_RANGE = 0.6666666f;

        ProgressTestingReducer() {
        }

        public void configure(JobConf job) {
            super.configure(job);
        }

        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            float reducePhaseProgress = (float)(++this.recordCount) / (float)INPUT_LINES;
            float weightedReducePhaseProgress = reducePhaseProgress * 0.33333334f;
            Assert.assertEquals((String)"Invalid progress in reduce", (double)(0.6666666f + weightedReducePhaseProgress), (double)reporter.getProgress(), (double)0.02f);
            this.reporter = reporter;
        }

        public void close() throws IOException {
            super.close();
            Assert.assertEquals((String)"Invalid progress in reduce cleanup", (double)1.0, (double)this.reporter.getProgress(), (double)0.0);
        }
    }

    static class StatusLimitMapper
    extends Mapper<LongWritable, Text, Text, Text> {
        StatusLimitMapper() {
        }

        public void map(LongWritable key, Text value, Mapper.Context context) throws IOException {
            StringBuilder sb = new StringBuilder(512);
            for (int i = 0; i < 1000; ++i) {
                sb.append("a");
            }
            context.setStatus(sb.toString());
            int progressStatusLength = context.getConfiguration().getInt("mapreduce.task.max.status.length", 512);
            if (context.getStatus().length() > progressStatusLength) {
                throw new IOException("Status is not truncated");
            }
        }
    }

    static class ProgressTesterMapper
    extends MapReduceBase
    implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, Text, Text> {
        private float progressRange = 0.0f;
        private int numRecords = 0;
        private Reporter reporter = null;

        ProgressTesterMapper() {
        }

        public void configure(JobConf job) {
            super.configure(job);
            this.progressRange = job.getNumReduceTasks() == 0 ? 1.0f : 0.667f;
        }

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            this.reporter = reporter;
            float mapProgress = (float)(++this.numRecords) / (float)INPUT_LINES;
            float attemptProgress = this.progressRange * mapProgress;
            Assert.assertEquals((String)"Invalid progress in map", (double)attemptProgress, (double)reporter.getProgress(), (double)0.0);
            output.collect((Object)new Text(value.toString() + this.numRecords), (Object)value);
        }

        public void close() throws IOException {
            super.close();
            Assert.assertEquals((String)"Invalid progress in map cleanup", (double)this.progressRange, (double)this.reporter.getProgress(), (double)0.0);
        }
    }
}

