/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapred.TestBadRecords;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Ignore;

/*
 * Exception performing whole class analysis ignored.
 */
@Ignore
public class TestBadRecords
extends ClusterMapReduceTestCase {
    private static final Log LOG = LogFactory.getLog(TestBadRecords.class);
    private static final List<String> MAPPER_BAD_RECORDS = Arrays.asList("hello01", "hello04", "hello05");
    private static final List<String> REDUCER_BAD_RECORDS = Arrays.asList("hello08", "hello10");
    private List<String> input = new ArrayList();

    public TestBadRecords() {
        for (int i = 1; i <= 10; ++i) {
            String str = "" + i;
            int zerosToPrepend = 2 - str.length();
            for (int j = 0; j < zerosToPrepend; ++j) {
                str = "0" + str;
            }
            this.input.add("hello" + str);
        }
    }

    private void runMapReduce(JobConf conf, List<String> mapperBadRecords, List<String> redBadRecords) throws Exception {
        this.createInput();
        conf.setJobName("mr");
        conf.setNumMapTasks(1);
        conf.setNumReduceTasks(1);
        conf.setInt("mapreduce.task.timeout", 30000);
        SkipBadRecords.setMapperMaxSkipRecords((Configuration)conf, (long)Long.MAX_VALUE);
        SkipBadRecords.setReducerMaxSkipGroups((Configuration)conf, (long)Long.MAX_VALUE);
        SkipBadRecords.setAttemptsToStartSkipping((Configuration)conf, (int)0);
        conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToStartSkipping((Configuration)conf) + 1 + mapperBadRecords.size());
        conf.setMaxReduceAttempts(SkipBadRecords.getAttemptsToStartSkipping((Configuration)conf) + 1 + redBadRecords.size());
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{this.getInputDir()});
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)this.getOutputDir());
        conf.setInputFormat(TextInputFormat.class);
        conf.setMapOutputKeyClass(LongWritable.class);
        conf.setMapOutputValueClass(Text.class);
        conf.setOutputFormat(TextOutputFormat.class);
        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(Text.class);
        RunningJob runningJob = JobClient.runJob((JobConf)conf);
        this.validateOutput(conf, runningJob, mapperBadRecords, redBadRecords);
    }

    private void createInput() throws Exception {
        FSDataOutputStream os = this.getFileSystem().create(new Path(this.getInputDir(), "text.txt"));
        OutputStreamWriter wr = new OutputStreamWriter((OutputStream)os);
        for (String inp : this.input) {
            wr.write(inp + "\n");
        }
        ((Writer)wr).close();
    }

    private void validateOutput(JobConf conf, RunningJob runningJob, List<String> mapperBadRecords, List<String> redBadRecords) throws Exception {
        Object reader;
        LOG.info((Object)runningJob.getCounters().toString());
        TestBadRecords.assertTrue((boolean)runningJob.isSuccessful());
        Counters counters = runningJob.getCounters();
        TestBadRecords.assertEquals((long)((Counters.Counter)counters.findCounter((Enum)TaskCounter.MAP_SKIPPED_RECORDS)).getCounter(), (long)mapperBadRecords.size());
        int mapRecs = this.input.size() - mapperBadRecords.size();
        TestBadRecords.assertEquals((long)((Counters.Counter)counters.findCounter((Enum)TaskCounter.MAP_INPUT_RECORDS)).getCounter(), (long)mapRecs);
        TestBadRecords.assertEquals((long)((Counters.Counter)counters.findCounter((Enum)TaskCounter.MAP_OUTPUT_RECORDS)).getCounter(), (long)mapRecs);
        int redRecs = mapRecs - redBadRecords.size();
        TestBadRecords.assertEquals((long)((Counters.Counter)counters.findCounter((Enum)TaskCounter.REDUCE_SKIPPED_RECORDS)).getCounter(), (long)redBadRecords.size());
        TestBadRecords.assertEquals((long)((Counters.Counter)counters.findCounter((Enum)TaskCounter.REDUCE_SKIPPED_GROUPS)).getCounter(), (long)redBadRecords.size());
        TestBadRecords.assertEquals((long)((Counters.Counter)counters.findCounter((Enum)TaskCounter.REDUCE_INPUT_GROUPS)).getCounter(), (long)redRecs);
        TestBadRecords.assertEquals((long)((Counters.Counter)counters.findCounter((Enum)TaskCounter.REDUCE_INPUT_RECORDS)).getCounter(), (long)redRecs);
        TestBadRecords.assertEquals((long)((Counters.Counter)counters.findCounter((Enum)TaskCounter.REDUCE_OUTPUT_RECORDS)).getCounter(), (long)redRecs);
        Path skipDir = SkipBadRecords.getSkipOutputPath((Configuration)conf);
        TestBadRecords.assertNotNull((Object)skipDir);
        Path[] skips = FileUtil.stat2Paths((FileStatus[])this.getFileSystem().listStatus(skipDir));
        ArrayList<String> mapSkipped = new ArrayList<String>();
        ArrayList<String> redSkipped = new ArrayList<String>();
        for (Path skipPath : skips) {
            LOG.info((Object)("skipPath: " + skipPath));
            reader = new SequenceFile.Reader(this.getFileSystem(), skipPath, (Configuration)conf);
            Object key = ReflectionUtils.newInstance((Class)reader.getKeyClass(), (Configuration)conf);
            Object value = ReflectionUtils.newInstance((Class)reader.getValueClass(), (Configuration)conf);
            key = reader.next(key);
            while (key != null) {
                value = reader.getCurrentValue(value);
                LOG.debug((Object)("key:" + key + " value:" + value.toString()));
                if (skipPath.getName().contains("_r_")) {
                    redSkipped.add(value.toString());
                } else {
                    mapSkipped.add(value.toString());
                }
                key = reader.next(key);
            }
            reader.close();
        }
        TestBadRecords.assertTrue((boolean)mapSkipped.containsAll(mapperBadRecords));
        TestBadRecords.assertTrue((boolean)redSkipped.containsAll(redBadRecords));
        Path[] outputFiles = FileUtil.stat2Paths((FileStatus[])this.getFileSystem().listStatus(this.getOutputDir(), (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter()));
        List mapperOutput = this.getProcessed(this.input, mapperBadRecords);
        LOG.debug((Object)("mapperOutput " + mapperOutput.size()));
        List reducerOutput = this.getProcessed(mapperOutput, redBadRecords);
        LOG.debug((Object)("reducerOutput " + reducerOutput.size()));
        if (outputFiles.length > 0) {
            FSDataInputStream is = this.getFileSystem().open(outputFiles[0]);
            reader = new BufferedReader(new InputStreamReader((InputStream)is));
            String line = ((BufferedReader)reader).readLine();
            int counter = 0;
            while (line != null) {
                ++counter;
                StringTokenizer tokeniz = new StringTokenizer(line, "\t");
                String key = tokeniz.nextToken();
                String value = tokeniz.nextToken();
                LOG.debug((Object)("Output: key:" + key + "  value:" + value));
                TestBadRecords.assertTrue((boolean)value.contains("hello"));
                TestBadRecords.assertTrue((boolean)reducerOutput.contains(value));
                line = ((BufferedReader)reader).readLine();
            }
            ((BufferedReader)reader).close();
            TestBadRecords.assertEquals((int)reducerOutput.size(), (int)counter);
        }
    }

    private List<String> getProcessed(List<String> inputs, List<String> badRecs) {
        ArrayList<String> processed = new ArrayList<String>();
        for (String input : inputs) {
            if (badRecs.contains(input)) continue;
            processed.add(input);
        }
        return processed;
    }

    public void testBadMapRed() throws Exception {
        JobConf conf = this.createJobConf();
        conf.setMapperClass(BadMapper.class);
        conf.setReducerClass(BadReducer.class);
        this.runMapReduce(conf, MAPPER_BAD_RECORDS, REDUCER_BAD_RECORDS);
    }

    static /* synthetic */ Log access$000() {
        return LOG;
    }

    static /* synthetic */ List access$100() {
        return MAPPER_BAD_RECORDS;
    }

    static /* synthetic */ List access$200() {
        return REDUCER_BAD_RECORDS;
    }
}

