/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinDataGen;
import org.apache.tez.examples.JoinValidate;
import org.apache.tez.examples.OrderedWordCount;
import org.apache.tez.examples.SimpleSessionExample;
import org.apache.tez.examples.SortMergeJoinExample;
import org.apache.tez.mapreduce.examples.CartesianProduct;
import org.apache.tez.mapreduce.examples.MultipleCommitsExample;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.test.MiniTezCluster;
import org.apache.tez.test.SimpleTestDAG;
import org.apache.tez.test.dag.MultiAttemptDAG;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestTezJobs {
    private static final Logger LOG = LoggerFactory.getLogger(TestTezJobs.class);
    protected static MiniTezCluster mrrTezCluster;
    protected static MiniDFSCluster dfsCluster;
    private static Configuration conf;
    private static FileSystem remoteFs;
    private static FileSystem localFs;
    private static String TEST_ROOT_DIR;
    private static final String VERTEX_WITH_INITIALIZER_NAME = "VertexWithInitializer";
    private static final String EVENT_GENERATING_VERTEX_NAME = "EventGeneratingVertex";
    private static final String INPUT1_NAME = "Input1";

    @BeforeClass
    public static void setup() throws IOException {
        localFs = FileSystem.getLocal((Configuration)conf);
        try {
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null).build();
            remoteFs = dfsCluster.getFileSystem();
        }
        catch (IOException io) {
            throw new RuntimeException("problem starting mini dfs cluster", io);
        }
        if (mrrTezCluster == null) {
            mrrTezCluster = new MiniTezCluster(TestTezJobs.class.getName(), 1, 1, 1);
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", remoteFs.getUri().toString());
            conf.setLong("tez.am.sleep.time.before.exit.millis", 500L);
            mrrTezCluster.init(conf);
            mrrTezCluster.start();
        }
    }

    @AfterClass
    public static void tearDown() {
        if (mrrTezCluster != null) {
            mrrTezCluster.stop();
            mrrTezCluster = null;
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
            dfsCluster = null;
        }
    }

    @Test(timeout=60000L)
    public void testHashJoinExample() throws Exception {
        String line;
        HashJoinExample hashJoinExample = new HashJoinExample();
        hashJoinExample.setConf(mrrTezCluster.getConfig());
        Path stagingDirPath = new Path("/tmp/tez-staging-dir");
        Path inPath1 = new Path("/tmp/hashJoin/inPath1");
        Path inPath2 = new Path("/tmp/hashJoin/inPath2");
        Path outPath = new Path("/tmp/hashJoin/outPath");
        remoteFs.mkdirs(inPath1);
        remoteFs.mkdirs(inPath2);
        remoteFs.mkdirs(stagingDirPath);
        HashSet<String> expectedResult = new HashSet<String>();
        FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
        FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
        BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter((OutputStream)out1));
        BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter((OutputStream)out2));
        for (int i = 0; i < 20; ++i) {
            String term = "term" + i;
            writer1.write(term);
            writer1.newLine();
            if (i % 2 != 0) continue;
            writer2.write(term);
            writer2.newLine();
            expectedResult.add(term);
        }
        writer1.close();
        writer2.close();
        out1.close();
        out2.close();
        String[] args = new String[]{"-Dtez.staging-dir=" + stagingDirPath.toString(), "-counter", inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
        Assert.assertEquals((long)0L, (long)hashJoinExample.run(args));
        FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter(){

            public boolean accept(Path p) {
                String name = p.getName();
                return !name.startsWith("_") && !name.startsWith(".");
            }
        });
        Assert.assertEquals((long)1L, (long)statuses.length);
        FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)inStream));
        while ((line = reader.readLine()) != null) {
            Assert.assertTrue((boolean)expectedResult.remove(line));
        }
        reader.close();
        inStream.close();
        Assert.assertEquals((long)0L, (long)expectedResult.size());
    }

    @Test(timeout=60000L)
    public void testHashJoinExampleDisableSplitGrouping() throws Exception {
        String line;
        HashJoinExample hashJoinExample = new HashJoinExample();
        hashJoinExample.setConf(conf);
        Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir");
        Path inPath1 = new Path(TEST_ROOT_DIR + "/tmp/hashJoin/inPath1");
        Path inPath2 = new Path(TEST_ROOT_DIR + "/tmp/hashJoin/inPath2");
        Path outPath = new Path(TEST_ROOT_DIR + "/tmp/hashJoin/outPath");
        localFs.delete(outPath, true);
        localFs.mkdirs(inPath1);
        localFs.mkdirs(inPath2);
        localFs.mkdirs(stagingDirPath);
        HashSet<String> expectedResult = new HashSet<String>();
        FSDataOutputStream out1 = localFs.create(new Path(inPath1, "file"));
        FSDataOutputStream out2 = localFs.create(new Path(inPath2, "file"));
        BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter((OutputStream)out1));
        BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter((OutputStream)out2));
        for (int i = 0; i < 20; ++i) {
            String term = "term" + i;
            writer1.write(term);
            writer1.newLine();
            if (i % 2 != 0) continue;
            writer2.write(term);
            writer2.newLine();
            expectedResult.add(term);
        }
        writer1.close();
        writer2.close();
        out1.close();
        out2.close();
        String[] args = new String[]{"-Dtez.staging-dir=" + stagingDirPath.toString(), "-counter", "-local", "-disableSplitGrouping", inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
        Assert.assertEquals((long)0L, (long)hashJoinExample.run(args));
        FileStatus[] statuses = localFs.listStatus(outPath, new PathFilter(){

            public boolean accept(Path p) {
                String name = p.getName();
                return !name.startsWith("_") && !name.startsWith(".");
            }
        });
        Assert.assertEquals((long)1L, (long)statuses.length);
        FSDataInputStream inStream = localFs.open(statuses[0].getPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)inStream));
        while ((line = reader.readLine()) != null) {
            Assert.assertTrue((boolean)expectedResult.remove(line));
        }
        reader.close();
        inStream.close();
        Assert.assertEquals((long)0L, (long)expectedResult.size());
    }

    @Test(timeout=60000L)
    public void testSortMergeJoinExample() throws Exception {
        String line;
        SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
        sortMergeJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
        Path stagingDirPath = new Path("/tmp/tez-staging-dir");
        Path inPath1 = new Path("/tmp/sortMerge/inPath1");
        Path inPath2 = new Path("/tmp/sortMerge/inPath2");
        Path outPath = new Path("/tmp/sortMerge/outPath");
        remoteFs.mkdirs(inPath1);
        remoteFs.mkdirs(inPath2);
        remoteFs.mkdirs(stagingDirPath);
        HashSet<String> expectedResult = new HashSet<String>();
        FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
        FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
        BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter((OutputStream)out1));
        BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter((OutputStream)out2));
        for (int i = 0; i < 20; ++i) {
            String term = "term" + i;
            writer1.write(term);
            writer1.newLine();
            if (i % 2 != 0) continue;
            writer2.write(term);
            writer2.newLine();
            expectedResult.add(term);
        }
        writer1.close();
        writer2.close();
        out1.close();
        out2.close();
        String[] args = new String[]{"-Dtez.staging-dir=" + stagingDirPath.toString(), "-Dtez.am.application.priority=2", "-counter", inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
        Assert.assertEquals((long)0L, (long)sortMergeJoinExample.run(args));
        FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter(){

            public boolean accept(Path p) {
                String name = p.getName();
                return !name.startsWith("_") && !name.startsWith(".");
            }
        });
        Assert.assertEquals((long)1L, (long)statuses.length);
        FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)inStream));
        while ((line = reader.readLine()) != null) {
            Assert.assertTrue((boolean)expectedResult.remove(line));
        }
        reader.close();
        inStream.close();
        Assert.assertEquals((long)0L, (long)expectedResult.size());
    }

    @Test(timeout=60000L)
    public void testPerIOCounterAggregation() throws Exception {
        String baseDir = "/tmp/perIOCounterAgg/";
        Path inPath1 = new Path(baseDir + "inPath1");
        Path inPath2 = new Path(baseDir + "inPath2");
        Path outPath = new Path(baseDir + "outPath");
        Set<String> expectedResults = this.generateSortMergeJoinInput(inPath1, inPath2);
        Path stagingDirPath = new Path("/tmp/tez-staging-dir");
        remoteFs.mkdirs(stagingDirPath);
        TezConfiguration conf = new TezConfiguration(mrrTezCluster.getConfig());
        conf.setBoolean("tez.task.generate.counters.per.io", true);
        TezClient tezClient = TezClient.create((String)SortMergeJoinHelper.class.getSimpleName(), (TezConfiguration)conf);
        tezClient.start();
        SortMergeJoinHelper sortMergeJoinHelper = new SortMergeJoinHelper(tezClient);
        sortMergeJoinHelper.setConf((Configuration)conf);
        String[] args = new String[]{"-Dtez.staging-dir=" + stagingDirPath.toString(), "-counter", inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
        Assert.assertEquals((long)0L, (long)sortMergeJoinHelper.run(conf, args, tezClient));
        this.verifySortMergeJoinInput(outPath, expectedResults);
        String joinerVertexName = "joiner";
        String input1Name = "input1";
        String input2Name = "input2";
        String joinOutputName = "joinOutput";
        HashSet<StatusGetOpts> statusOpts = new HashSet<StatusGetOpts>();
        statusOpts.add(StatusGetOpts.GET_COUNTERS);
        VertexStatus joinerVertexStatus = sortMergeJoinHelper.dagClient.getVertexStatus(joinerVertexName, statusOpts);
        TezCounters joinerCounters = joinerVertexStatus.getVertexCounters();
        CounterGroup aggregatedGroup = (CounterGroup)joinerCounters.getGroup(TaskCounter.class.getCanonicalName());
        CounterGroup input1Group = (CounterGroup)joinerCounters.getGroup(TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_INPUT_" + input1Name);
        CounterGroup input2Group = (CounterGroup)joinerCounters.getGroup(TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_INPUT_" + input2Name);
        Assert.assertTrue((String)"aggregated counter group cannot be empty", (aggregatedGroup.size() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((String)"per io group for input1 cannot be empty", (input1Group.size() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((String)"per io group for input1 cannot be empty", (input2Group.size() > 0 ? 1 : 0) != 0);
        List<TaskCounter> countersToVerifyAgg = Arrays.asList(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ, TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN, TaskCounter.COMBINE_INPUT_RECORDS, TaskCounter.MERGED_MAP_OUTPUTS, TaskCounter.NUM_DISK_TO_DISK_MERGES, TaskCounter.NUM_FAILED_SHUFFLE_INPUTS, TaskCounter.NUM_MEM_TO_DISK_MERGES, TaskCounter.NUM_SHUFFLED_INPUTS, TaskCounter.NUM_SKIPPED_INPUTS, TaskCounter.REDUCE_INPUT_GROUPS, TaskCounter.REDUCE_INPUT_RECORDS, TaskCounter.SHUFFLE_BYTES, TaskCounter.SHUFFLE_BYTES_DECOMPRESSED, TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, TaskCounter.SHUFFLE_BYTES_TO_DISK, TaskCounter.SHUFFLE_BYTES_TO_MEM, TaskCounter.SPILLED_RECORDS);
        int nonZeroCounters = 0;
        for (TaskCounter c : countersToVerifyAgg) {
            TezCounter aggregatedCounter = aggregatedGroup.findCounter(c.name(), false);
            TezCounter input1Counter = input1Group.findCounter(c.name(), false);
            TezCounter input2Counter = input2Group.findCounter(c.name(), false);
            Assert.assertNotNull((String)("aggregated counter cannot be null " + c.name()), (Object)aggregatedCounter);
            Assert.assertNotNull((String)("input1 counter cannot be null " + c.name()), (Object)input1Counter);
            Assert.assertNotNull((String)("input2 counter cannot be null " + c.name()), (Object)input2Counter);
            Assert.assertEquals((String)("aggregated counter does not match sum of input counters " + c.name()), (long)aggregatedCounter.getValue(), (long)(input1Counter.getValue() + input2Counter.getValue()));
            if (aggregatedCounter.getValue() <= 0L) continue;
            ++nonZeroCounters;
        }
        Assert.assertTrue((String)"At least one of the counter should be non-zero. invalid test ", (nonZeroCounters > 0 ? 1 : 0) != 0);
        CounterGroup joinerOutputGroup = (CounterGroup)joinerCounters.getGroup(TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_OUTPUT_" + joinOutputName);
        String outputCounterName = TaskCounter.OUTPUT_RECORDS.name();
        TezCounter aggregateCounter = aggregatedGroup.findCounter(outputCounterName, false);
        TezCounter joinerOutputCounter = joinerOutputGroup.findCounter(outputCounterName, false);
        Assert.assertNotNull((String)("aggregated counter cannot be null " + outputCounterName), (Object)aggregateCounter);
        Assert.assertNotNull((String)("output counter cannot be null " + outputCounterName), (Object)joinerOutputCounter);
        Assert.assertTrue((String)"counter value is zero. test is invalid", (aggregateCounter.getValue() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((String)("aggregated counter does not match sum of output counters " + outputCounterName), (long)aggregateCounter.getValue(), (long)joinerOutputCounter.getValue());
    }

    @Test(timeout=60000L)
    public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception {
        String line;
        SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
        sortMergeJoinExample.setConf(conf);
        Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/tez-staging-dir");
        Path inPath1 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath1");
        Path inPath2 = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/inPath2");
        Path outPath = new Path(TEST_ROOT_DIR + "/tmp/sortMerge/outPath");
        localFs.delete(outPath, true);
        localFs.mkdirs(inPath1);
        localFs.mkdirs(inPath2);
        localFs.mkdirs(stagingDirPath);
        HashSet<String> expectedResult = new HashSet<String>();
        FSDataOutputStream out1 = localFs.create(new Path(inPath1, "file"));
        FSDataOutputStream out2 = localFs.create(new Path(inPath2, "file"));
        BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter((OutputStream)out1));
        BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter((OutputStream)out2));
        for (int i = 0; i < 20; ++i) {
            String term = "term" + i;
            writer1.write(term);
            writer1.newLine();
            if (i % 2 != 0) continue;
            writer2.write(term);
            writer2.newLine();
            expectedResult.add(term);
        }
        writer1.close();
        writer2.close();
        out1.close();
        out2.close();
        String[] args = new String[]{"-Dtez.staging-dir=" + stagingDirPath.toString(), "-counter", "-local", "-disableSplitGrouping", inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
        Assert.assertEquals((long)0L, (long)sortMergeJoinExample.run(args));
        FileStatus[] statuses = localFs.listStatus(outPath, new PathFilter(){

            public boolean accept(Path p) {
                String name = p.getName();
                return !name.startsWith("_") && !name.startsWith(".");
            }
        });
        Assert.assertEquals((long)1L, (long)statuses.length);
        FSDataInputStream inStream = localFs.open(statuses[0].getPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)inStream));
        while ((line = reader.readLine()) != null) {
            Assert.assertTrue((boolean)expectedResult.remove(line));
        }
        reader.close();
        inStream.close();
        Assert.assertEquals((long)0L, (long)expectedResult.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testHashJoinExamplePipeline() throws Exception {
        Path testDir = new Path("/tmp/testHashJoinExample");
        Path stagingDirPath = new Path("/tmp/tez-staging-dir");
        remoteFs.mkdirs(stagingDirPath);
        remoteFs.mkdirs(testDir);
        Path dataPath1 = new Path(testDir, "inPath1");
        Path dataPath2 = new Path(testDir, "inPath2");
        Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
        Path outPath = new Path(testDir, "outPath");
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        TezClient tezSession = null;
        try {
            tezSession = TezClient.create((String)"HashJoinExampleSession", (TezConfiguration)tezConf, (boolean)true);
            tezSession.start();
            JoinDataGen dataGen = new JoinDataGen();
            String[] dataGenArgs = new String[]{"-counter", dataPath1.toString(), "1048576", dataPath2.toString(), "524288", expectedOutputPath.toString(), "2"};
            Assert.assertEquals((long)0L, (long)dataGen.run(tezConf, dataGenArgs, tezSession));
            HashJoinExample joinExample = new HashJoinExample();
            String[] args = new String[]{dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()};
            Assert.assertEquals((long)0L, (long)joinExample.run(tezConf, args, tezSession));
            JoinValidate joinValidate = new JoinValidate();
            String[] validateArgs = new String[]{"-counter", expectedOutputPath.toString(), outPath.toString(), "3"};
            Assert.assertEquals((long)0L, (long)joinValidate.run(tezConf, validateArgs, tezSession));
        }
        finally {
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testSortMergeJoinExamplePipeline() throws Exception {
        Path testDir = new Path("/tmp/testSortMergeExample");
        Path stagingDirPath = new Path("/tmp/tez-staging-dir");
        remoteFs.mkdirs(stagingDirPath);
        remoteFs.mkdirs(testDir);
        Path dataPath1 = new Path(testDir, "inPath1");
        Path dataPath2 = new Path(testDir, "inPath2");
        Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
        Path outPath = new Path(testDir, "outPath");
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        TezClient tezSession = null;
        try {
            tezSession = TezClient.create((String)"SortMergeExampleSession", (TezConfiguration)tezConf, (boolean)true);
            tezSession.start();
            JoinDataGen dataGen = new JoinDataGen();
            String[] dataGenArgs = new String[]{dataPath1.toString(), "1048576", dataPath2.toString(), "524288", expectedOutputPath.toString(), "2"};
            Assert.assertEquals((long)0L, (long)dataGen.run(tezConf, dataGenArgs, tezSession));
            SortMergeJoinExample joinExample = new SortMergeJoinExample();
            String[] args = new String[]{dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()};
            Assert.assertEquals((long)0L, (long)joinExample.run(tezConf, args, tezSession));
            JoinValidate joinValidate = new JoinValidate();
            String[] validateArgs = new String[]{expectedOutputPath.toString(), outPath.toString(), "3"};
            Assert.assertEquals((long)0L, (long)joinValidate.run(tezConf, validateArgs, tezSession));
        }
        finally {
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void generateOrderedWordCountInput(Path inputDir, FileSystem fs) throws IOException {
        Path dataPath1 = new Path(inputDir, "inPath1");
        Path dataPath2 = new Path(inputDir, "inPath2");
        FSDataOutputStream f1 = null;
        FSDataOutputStream f2 = null;
        try {
            f1 = fs.create(dataPath1);
            f2 = fs.create(dataPath2);
            String prefix = "a";
            for (int i = 1; i <= 10; ++i) {
                String word = "a_" + i;
                for (int j = 10; j >= i; --j) {
                    LOG.info("Writing " + word + " to input files");
                    f1.write(word.getBytes());
                    f1.writeChars("\t");
                    f2.write(word.getBytes());
                    f2.writeChars("\t");
                }
            }
            f1.hsync();
            f2.hsync();
        }
        finally {
            if (f1 != null) {
                f1.close();
            }
            if (f2 != null) {
                f2.close();
            }
        }
    }

    public static void verifyOrderedWordCountOutput(Path resultFile, FileSystem fs) throws IOException {
        String line;
        FSDataInputStream inputStream = fs.open(resultFile);
        String prefix = "a";
        int currentCounter = 10;
        byte[] buffer = new byte[4096];
        int bytesRead = inputStream.read(buffer, 0, 4096);
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer, 0, bytesRead)));
        while ((line = reader.readLine()) != null) {
            LOG.info("Line: " + line + ", counter=" + currentCounter);
            int pos = line.indexOf("\t");
            String word = line.substring(0, pos - 1);
            Assert.assertEquals((Object)("a_" + currentCounter), (Object)word);
            String val = line.substring(pos + 1, line.length());
            Assert.assertEquals((long)((long)(11 - currentCounter) * 2L), (long)Long.valueOf(val));
            --currentCounter;
        }
        Assert.assertEquals((long)0L, (long)currentCounter);
    }

    public static void verifyOutput(Path outputDir, FileSystem fs) throws IOException {
        FileStatus[] fileStatuses = fs.listStatus(outputDir);
        Path resultFile = null;
        boolean foundResult = false;
        boolean foundSuccessFile = false;
        for (FileStatus fileStatus : fileStatuses) {
            if (!fileStatus.isFile()) continue;
            if (fileStatus.getPath().getName().equals("_SUCCESS")) {
                foundSuccessFile = true;
                continue;
            }
            if (!fileStatus.getPath().getName().startsWith("part-")) continue;
            if (foundResult) {
                Assert.fail((String)("Found 2 part files instead of 1, paths=" + resultFile + "," + fileStatus.getPath()));
            }
            foundResult = true;
            resultFile = fileStatus.getPath();
            LOG.info("Found output at " + resultFile);
        }
        Assert.assertTrue((boolean)foundResult);
        Assert.assertTrue((resultFile != null ? 1 : 0) != 0);
        Assert.assertTrue((boolean)foundSuccessFile);
        TestTezJobs.verifyOrderedWordCountOutput(resultFile, fs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testOrderedWordCount() throws Exception {
        String inputDirStr = "/tmp/owc-input/";
        Path inputDir = new Path(inputDirStr);
        Path stagingDirPath = new Path("/tmp/owc-staging-dir");
        remoteFs.mkdirs(inputDir);
        remoteFs.mkdirs(stagingDirPath);
        TestTezJobs.generateOrderedWordCountInput(inputDir, remoteFs);
        String outputDirStr = "/tmp/owc-output/";
        Path outputDir = new Path(outputDirStr);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        Object tezSession = null;
        try {
            OrderedWordCount job = new OrderedWordCount();
            Assert.assertTrue((String)"OrderedWordCount failed", (job.run(tezConf, new String[]{"-counter", inputDirStr, outputDirStr, "2"}, null) == 0 ? 1 : 0) != 0);
            TestTezJobs.verifyOutput(outputDir, remoteFs);
        }
        finally {
            remoteFs.delete(stagingDirPath, true);
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testOrderedWordCountDisableSplitGrouping() throws Exception {
        String inputDirStr = TEST_ROOT_DIR + "/tmp/owc-input/";
        Path inputDir = new Path(inputDirStr);
        Path stagingDirPath = new Path(TEST_ROOT_DIR + "/tmp/owc-staging-dir");
        localFs.mkdirs(inputDir);
        localFs.mkdirs(stagingDirPath);
        TestTezJobs.generateOrderedWordCountInput(inputDir, localFs);
        String outputDirStr = TEST_ROOT_DIR + "/tmp/owc-output/";
        localFs.delete(new Path(outputDirStr), true);
        Path outputDir = new Path(outputDirStr);
        TezConfiguration tezConf = new TezConfiguration(conf);
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        Object tezSession = null;
        try {
            OrderedWordCount job = new OrderedWordCount();
            Assert.assertTrue((String)"OrderedWordCount failed", (job.run(tezConf, new String[]{"-counter", "-local", "-disableSplitGrouping", inputDirStr, outputDirStr, "2"}, null) == 0 ? 1 : 0) != 0);
            TestTezJobs.verifyOutput(outputDir, localFs);
        }
        finally {
            localFs.delete(stagingDirPath, true);
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testSimpleSessionExample() throws Exception {
        Path stagingDirPath = new Path("/tmp/owc-staging-dir");
        remoteFs.mkdirs(stagingDirPath);
        int numIterations = 2;
        String[] inputPaths = new String[numIterations];
        String[] outputPaths = new String[numIterations];
        Path[] outputDirs = new Path[numIterations];
        for (int i = 0; i < numIterations; ++i) {
            Path outputDir;
            String outputDirStr;
            String inputDirStr;
            inputPaths[i] = inputDirStr = "/tmp/owc-input-" + i + "/";
            Path inputDir = new Path(inputDirStr);
            remoteFs.mkdirs(inputDir);
            TestTezJobs.generateOrderedWordCountInput(inputDir, remoteFs);
            outputPaths[i] = outputDirStr = "/tmp/owc-output-" + i + "/";
            outputDirs[i] = outputDir = new Path(outputDirStr);
        }
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        YarnClient yarnClient = YarnClient.createYarnClient();
        try {
            yarnClient.init(mrrTezCluster.getConfig());
            yarnClient.start();
            List apps = yarnClient.getApplications();
            int appsBeforeCount = apps != null ? apps.size() : 0;
            SimpleSessionExample job = new SimpleSessionExample();
            tezConf.setBoolean("tez.am.mode.session", true);
            Assert.assertTrue((String)"SimpleSessionExample failed", (job.run(tezConf, new String[]{StringUtils.join((CharSequence)",", (String[])inputPaths), StringUtils.join((CharSequence)",", (String[])outputPaths), "2"}, null) == 0 ? 1 : 0) != 0);
            for (int i = 0; i < numIterations; ++i) {
                TestTezJobs.verifyOutput(outputDirs[i], remoteFs);
            }
            apps = yarnClient.getApplications();
            int appsAfterCount = apps != null ? apps.size() : 0;
            Assert.assertEquals((long)(appsBeforeCount + 1), (long)appsAfterCount);
        }
        finally {
            remoteFs.delete(stagingDirPath, true);
            if (yarnClient != null) {
                yarnClient.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testInvalidQueueSubmission() throws Exception {
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        YarnClient yarnClient = YarnClient.createYarnClient();
        try {
            String outputDirStr;
            String inputDirStr;
            yarnClient.init(mrrTezCluster.getConfig());
            yarnClient.start();
            SimpleSessionExample job = new SimpleSessionExample();
            tezConf.setBoolean("tez.am.mode.session", false);
            tezConf.set("tez.queue.name", "nonexistent");
            String[] inputPaths = new String[1];
            String[] outputPaths = new String[1];
            inputPaths[0] = inputDirStr = "/tmp/owc-input";
            Path inputDir = new Path(inputDirStr);
            remoteFs.mkdirs(inputDir);
            outputPaths[0] = outputDirStr = "/tmp/owc-output";
            int result = job.run(tezConf, new String[]{StringUtils.join((CharSequence)",", (String[])inputPaths), StringUtils.join((CharSequence)",", (String[])outputPaths), "2"}, null);
            Assert.assertTrue((String)"Job should have failed", (result != 0 ? 1 : 0) != 0);
        }
        catch (TezException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Failed to submit application"));
        }
        finally {
            if (yarnClient != null) {
                yarnClient.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testInvalidQueueSubmissionToSession() throws Exception {
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        YarnClient yarnClient = YarnClient.createYarnClient();
        try {
            String outputDirStr;
            String inputDirStr;
            yarnClient.init(mrrTezCluster.getConfig());
            yarnClient.start();
            SimpleSessionExample job = new SimpleSessionExample();
            tezConf.setBoolean("tez.am.mode.session", true);
            tezConf.set("tez.queue.name", "nonexistent");
            String[] inputPaths = new String[1];
            String[] outputPaths = new String[1];
            inputPaths[0] = inputDirStr = "/tmp/owc-input";
            Path inputDir = new Path(inputDirStr);
            remoteFs.mkdirs(inputDir);
            outputPaths[0] = outputDirStr = "/tmp/owc-output";
            job.run(tezConf, new String[]{StringUtils.join((CharSequence)",", (String[])inputPaths), StringUtils.join((CharSequence)",", (String[])outputPaths), "2"}, null);
            Assert.fail((String)"Job submission should have failed");
        }
        catch (SessionNotRunning e) {
            LOG.info("Session not running", (Throwable)e);
        }
        catch (TezException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Failed to submit application"));
        }
        finally {
            if (yarnClient != null) {
                yarnClient.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testVertexOrder() throws Exception {
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        TezClient tezClient = TezClient.create((String)"TestVertexOrder", (TezConfiguration)tezConf);
        tezClient.start();
        try {
            DAG dag = SimpleTestDAG.createDAGForVertexOrder("dag1", conf);
            DAGClient dagClient = tezClient.submitDAG(dag);
            DAGStatus dagStatus = dagClient.getDAGStatus(null);
            while (!dagStatus.isCompleted()) {
                LOG.info("Waiting for dag to complete. Sleeping for 500ms. DAG name: " + dag.getName() + " DAG context: " + dagClient.getExecutionContext() + " Current state: " + dagStatus.getState());
                Thread.sleep(100L);
                dagStatus = dagClient.getDAGStatus(null);
            }
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
            Set resultVertices = dagStatus.getVertexProgress().keySet();
            Assert.assertEquals((long)6L, (long)resultVertices.size());
            int i = 0;
            for (String vertexName : resultVertices) {
                if (i <= 1) {
                    Assert.assertTrue((vertexName.equals("v1") || vertexName.equals("v2") ? 1 : 0) != 0);
                } else if (i == 2) {
                    Assert.assertTrue((boolean)vertexName.equals("v3"));
                } else if (i <= 4) {
                    Assert.assertTrue((vertexName.equals("v4") || vertexName.equals("v5") ? 1 : 0) != 0);
                } else {
                    Assert.assertTrue((boolean)vertexName.equals("v6"));
                }
                ++i;
            }
        }
        finally {
            if (tezClient != null) {
                tezClient.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testInputInitializerEvents() throws TezException, InterruptedException, IOException {
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        TezClient tezClient = TezClient.create((String)"TestInputInitializerEvents", (TezConfiguration)tezConf);
        tezClient.start();
        try {
            DAG dag = DAG.create((String)"TestInputInitializerEvents");
            Vertex vertex1 = Vertex.create((String)VERTEX_WITH_INITIALIZER_NAME, (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)SleepProcessor.class.getName()).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload())), (int)1).addDataSource(INPUT1_NAME, DataSourceDescriptor.create((InputDescriptor)InputDescriptor.create((String)MultiAttemptDAG.NoOpInput.class.getName()), (InputInitializerDescriptor)InputInitializerDescriptor.create((String)InputInitializerForTest.class.getName()), null));
            Vertex vertex2 = Vertex.create((String)EVENT_GENERATING_VERTEX_NAME, (ProcessorDescriptor)ProcessorDescriptor.create((String)InputInitializerEventGeneratingProcessor.class.getName()), (int)5);
            dag.addVertex(vertex1).addVertex(vertex2);
            DAGClient dagClient = tezClient.submitDAG(dag);
            dagClient.waitForCompletion();
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        }
        finally {
            tezClient.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testMultipleCommits_OnDAGSuccess() throws Exception {
        Path stagingDirPath = new Path("/tmp/commit-staging-dir");
        Random rand = new Random();
        String v1OutputPathPrefix = "/tmp/commit-output-v1";
        int v1OutputNum = rand.nextInt(10) + 1;
        String v2OutputPathPrefix = "/tmp/commit-output-v2";
        int v2OutputNum = rand.nextInt(10) + 1;
        String uv12OutputPathPrefix = "/tmp/commit-output-uv12";
        int uv12OutputNum = rand.nextInt(10) + 1;
        String v3OutputPathPrefix = "/tmp/commit-output-v3";
        int v3OutputNum = rand.nextInt(10) + 1;
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        Object tezSession = null;
        try {
            MultipleCommitsExample job = new MultipleCommitsExample();
            Assert.assertTrue((String)"MultipleCommitsExample failed", (job.run(tezConf, new String[]{v1OutputPathPrefix, v1OutputNum + "", v2OutputPathPrefix, v2OutputNum + "", uv12OutputPathPrefix, uv12OutputNum + "", v3OutputPathPrefix, v3OutputNum + ""}, null) == 0 ? 1 : 0) != 0);
            this.verifyCommits(v1OutputPathPrefix, v1OutputNum);
            this.verifyCommits(v2OutputPathPrefix, v2OutputNum);
            this.verifyCommits(uv12OutputPathPrefix, uv12OutputNum);
            this.verifyCommits(v3OutputPathPrefix, v3OutputNum);
        }
        finally {
            remoteFs.delete(stagingDirPath, true);
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testMultipleCommits_OnVertexSuccess() throws Exception {
        Path stagingDirPath = new Path("/tmp/commit-staging-dir");
        Random rand = new Random();
        String v1OutputPathPrefix = "/tmp/commit-output-v1";
        int v1OutputNum = rand.nextInt(10) + 1;
        String v2OutputPathPrefix = "/tmp/commit-output-v2";
        int v2OutputNum = rand.nextInt(10) + 1;
        String uv12OutputPathPrefix = "/tmp/commit-output-uv12";
        int uv12OutputNum = rand.nextInt(10) + 1;
        String v3OutputPathPrefix = "/tmp/commit-output-v3";
        int v3OutputNum = rand.nextInt(10) + 1;
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        Object tezSession = null;
        try {
            MultipleCommitsExample job = new MultipleCommitsExample();
            Assert.assertTrue((String)"MultipleCommitsExample failed", (job.run(tezConf, new String[]{v1OutputPathPrefix, v1OutputNum + "", v2OutputPathPrefix, v2OutputNum + "", uv12OutputPathPrefix, uv12OutputNum + "", v3OutputPathPrefix, v3OutputNum + "", "commitOnVertexSuccess"}, null) == 0 ? 1 : 0) != 0);
            this.verifyCommits(v1OutputPathPrefix, v1OutputNum);
            this.verifyCommits(v2OutputPathPrefix, v2OutputNum);
            this.verifyCommits(uv12OutputPathPrefix, uv12OutputNum);
            this.verifyCommits(v3OutputPathPrefix, v3OutputNum);
        }
        finally {
            remoteFs.delete(stagingDirPath, true);
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    private void verifyCommits(String outputPrefix, int outputNum) throws IllegalArgumentException, IOException {
        for (int i = 0; i < outputNum; ++i) {
            String outputDir = outputPrefix + "_" + i;
            Assert.assertTrue((String)("Output of " + outputDir + " is not succeeded"), (boolean)remoteFs.exists(new Path(outputDir + "/_SUCCESS")));
        }
    }

    private Set<String> generateSortMergeJoinInput(Path inPath1, Path inPath2) throws IOException {
        remoteFs.mkdirs(inPath1);
        remoteFs.mkdirs(inPath2);
        HashSet<String> expectedResult = new HashSet<String>();
        FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
        FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
        BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter((OutputStream)out1));
        BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter((OutputStream)out2));
        for (int i = 0; i < 20; ++i) {
            String term = "term" + i;
            writer1.write(term);
            writer1.newLine();
            if (i % 2 != 0) continue;
            writer2.write(term);
            writer2.newLine();
            expectedResult.add(term);
        }
        writer1.close();
        writer2.close();
        out1.close();
        out2.close();
        return expectedResult;
    }

    private void verifySortMergeJoinInput(Path outPath, Set<String> expectedResult) throws IOException {
        String line;
        FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter(){

            public boolean accept(Path p) {
                String name = p.getName();
                return !name.startsWith("_") && !name.startsWith(".");
            }
        });
        Assert.assertEquals((long)1L, (long)statuses.length);
        FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)inStream));
        while ((line = reader.readLine()) != null) {
            Assert.assertTrue((boolean)expectedResult.remove(line));
        }
        reader.close();
        inStream.close();
        Assert.assertEquals((long)0L, (long)expectedResult.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testAMClientHeartbeatTimeout() throws Exception {
        Path stagingDirPath = new Path("/tmp/timeout-staging-dir");
        remoteFs.mkdirs(stagingDirPath);
        YarnClient yarnClient = YarnClient.createYarnClient();
        try {
            ApplicationReport report;
            yarnClient.init(mrrTezCluster.getConfig());
            yarnClient.start();
            List apps = yarnClient.getApplications();
            int appsBeforeCount = apps != null ? apps.size() : 0;
            TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
            tezConf.set("tez.staging-dir", stagingDirPath.toString());
            tezConf.setInt("tez.am.client.heartbeat.timeout.secs", 5);
            TezClient tezClient = TezClient.create((String)"testAMClientHeartbeatTimeout", (TezConfiguration)tezConf, (boolean)true);
            tezClient.start();
            tezClient.cancelAMKeepAlive(true);
            ApplicationId appId = tezClient.getAppMasterApplicationId();
            apps = yarnClient.getApplications();
            int appsAfterCount = apps != null ? apps.size() : 0;
            Assert.assertEquals((long)(appsBeforeCount + 1), (long)appsAfterCount);
            while ((report = yarnClient.getApplicationReport(appId)).getYarnApplicationState() != YarnApplicationState.FINISHED && report.getYarnApplicationState() != YarnApplicationState.FAILED && report.getYarnApplicationState() != YarnApplicationState.KILLED) {
                Thread.sleep(1000L);
            }
            Thread.sleep(2000L);
            report = yarnClient.getApplicationReport(appId);
            LOG.info("App Report for appId=" + appId + ", report=" + report);
            Assert.assertTrue((String)("Actual diagnostics: " + report.getDiagnostics()), (boolean)report.getDiagnostics().contains("Client-to-AM Heartbeat timeout interval expired"));
        }
        finally {
            remoteFs.delete(stagingDirPath, true);
            if (yarnClient != null) {
                yarnClient.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testSessionTimeout() throws Exception {
        Path stagingDirPath = new Path("/tmp/sessiontimeout-staging-dir");
        remoteFs.mkdirs(stagingDirPath);
        YarnClient yarnClient = YarnClient.createYarnClient();
        try {
            ApplicationReport report;
            yarnClient.init(mrrTezCluster.getConfig());
            yarnClient.start();
            List apps = yarnClient.getApplications();
            int appsBeforeCount = apps != null ? apps.size() : 0;
            TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
            tezConf.set("tez.staging-dir", stagingDirPath.toString());
            tezConf.setInt("tez.session.am.dag.submit.timeout.secs", 5);
            TezClient tezClient = TezClient.create((String)"testSessionTimeout", (TezConfiguration)tezConf, (boolean)true);
            tezClient.start();
            ApplicationId appId = tezClient.getAppMasterApplicationId();
            apps = yarnClient.getApplications();
            int appsAfterCount = apps != null ? apps.size() : 0;
            Assert.assertEquals((long)(appsBeforeCount + 1), (long)appsAfterCount);
            while ((report = yarnClient.getApplicationReport(appId)).getYarnApplicationState() != YarnApplicationState.FINISHED && report.getYarnApplicationState() != YarnApplicationState.FAILED && report.getYarnApplicationState() != YarnApplicationState.KILLED) {
                Thread.sleep(1000L);
            }
            Thread.sleep(2000L);
            report = yarnClient.getApplicationReport(appId);
            LOG.info("App Report for appId=" + appId + ", report=" + report);
            Assert.assertTrue((String)("Actual diagnostics: " + report.getDiagnostics()), (boolean)report.getDiagnostics().contains("Session timed out"));
        }
        finally {
            remoteFs.delete(stagingDirPath, true);
            if (yarnClient != null) {
                yarnClient.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testVertexFailuresMaxPercent() throws TezException, InterruptedException, IOException {
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.vertex.failures.maxpercent", "50.0f");
        tezConf.setInt("tez.am.task.max.failed.attempts", 1);
        TezClient tezClient = TezClient.create((String)"TestVertexFailuresMaxPercent", (TezConfiguration)tezConf);
        tezClient.start();
        try {
            DAG dag = DAG.create((String)"TestVertexFailuresMaxPercent");
            Vertex vertex1 = Vertex.create((String)"Parent", (ProcessorDescriptor)ProcessorDescriptor.create((String)FailingAttemptProcessor.class.getName()), (int)2);
            Vertex vertex2 = Vertex.create((String)"Child", (ProcessorDescriptor)ProcessorDescriptor.create((String)FailingAttemptProcessor.class.getName()), (int)2);
            OrderedPartitionedKVEdgeConfig edgeConfig = OrderedPartitionedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)IntWritable.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration((Configuration)tezConf).build();
            dag.addVertex(vertex1).addVertex(vertex2).addEdge(Edge.create((Vertex)vertex1, (Vertex)vertex2, (EdgeProperty)edgeConfig.createDefaultEdgeProperty()));
            DAGClient dagClient = tezClient.submitDAG(dag);
            dagClient.waitForCompletion();
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        }
        finally {
            tezClient.stop();
        }
    }

    @Test(timeout=60000L)
    public void testCartesianProduct() throws Exception {
        LOG.info("Running CartesianProduct Test");
        CartesianProduct job = new CartesianProduct();
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.setInt("tez.cartesian-product.max-parallelism", 10);
        tezConf.setInt("tez.cartesian-product.min-ops-per-worker", 25);
        Assert.assertEquals((String)"CartesianProduct failed", (long)job.run(tezConf, null, null), (long)0L);
    }

    static {
        conf = new Configuration();
        TEST_ROOT_DIR = "target/" + TestTezJobs.class.getName() + "-tmpDir";
    }

    private static class SortMergeJoinHelper
    extends SortMergeJoinExample {
        private final TezClient tezClientInternal;
        private DAGClient dagClient;

        public SortMergeJoinHelper(TezClient tezClient) {
            this.tezClientInternal = tezClient;
        }

        public int runDag(DAG dag, boolean printCounters, Logger logger) throws TezException, InterruptedException, IOException {
            DAGStatus dagStatus;
            this.tezClientInternal.waitTillReady();
            this.dagClient = this.tezClientInternal.submitDAG(dag);
            HashSet<StatusGetOpts> getOpts = new HashSet<StatusGetOpts>();
            if (printCounters) {
                getOpts.add(StatusGetOpts.GET_COUNTERS);
            }
            if ((dagStatus = this.dagClient.waitForCompletionWithStatusUpdates(getOpts)).getState() != DAGStatus.State.SUCCEEDED) {
                logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
                return -1;
            }
            return 0;
        }
    }

    public static class InputInitializerForTest
    extends InputInitializer {
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition condition = this.lock.newCondition();
        private final BitSet eventsSeen = new BitSet();

        public InputInitializerForTest(InputInitializerContext initializerContext) {
            super(initializerContext);
            this.getContext().registerForVertexStateUpdates(TestTezJobs.EVENT_GENERATING_VERTEX_NAME, EnumSet.of(VertexState.SUCCEEDED));
        }

        public List<Event> initialize() throws Exception {
            this.lock.lock();
            try {
                this.condition.await();
            }
            finally {
                this.lock.unlock();
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
            this.lock.lock();
            try {
                for (InputInitializerEvent event : events) {
                    Preconditions.checkArgument((boolean)event.getSourceVertexName().equals(TestTezJobs.EVENT_GENERATING_VERTEX_NAME));
                    int index = event.getUserPayload().getInt(0);
                    Preconditions.checkState((!this.eventsSeen.get(index) ? 1 : 0) != 0);
                    this.eventsSeen.set(index);
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
            block4: {
                this.lock.lock();
                try {
                    Preconditions.checkArgument((stateUpdate.getVertexState() == VertexState.SUCCEEDED ? 1 : 0) != 0);
                    if (this.eventsSeen.cardinality() == this.getContext().getVertexNumTasks(TestTezJobs.EVENT_GENERATING_VERTEX_NAME)) {
                        this.condition.signal();
                        break block4;
                    }
                    throw new IllegalStateException("Received VertexState SUCCEEDED before receiving all InputInitializerEvents");
                }
                finally {
                    this.lock.unlock();
                }
            }
        }
    }

    public static class FailingAttemptProcessor
    extends SimpleProcessor {
        public FailingAttemptProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            if (this.getContext().getTaskIndex() == 0) {
                LOG.info("Failing task " + this.getContext().getTaskIndex() + ", attempt " + this.getContext().getTaskAttemptNumber());
                throw new IOException("Failing task " + this.getContext().getTaskIndex() + ", attempt " + this.getContext().getTaskAttemptNumber());
            }
        }
    }

    public static class InputInitializerEventGeneratingProcessor
    extends SimpleProcessor {
        public InputInitializerEventGeneratingProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            if (this.getContext().getTaskIndex() == 1 && this.getContext().getTaskAttemptNumber() == 0) {
                throw new IOException("Failing task 2, attempt 0");
            }
            InputInitializerEvent initializerEvent = InputInitializerEvent.create((String)TestTezJobs.VERTEX_WITH_INITIALIZER_NAME, (String)TestTezJobs.INPUT1_NAME, (ByteBuffer)ByteBuffer.allocate(4).putInt(0, this.getContext().getTaskIndex()));
            ArrayList events = Lists.newArrayList();
            events.add(initializerEvent);
            this.getContext().sendEvents((List)events);
        }
    }
}

