/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Tool;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.test.MiniTezCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestPipelinedShuffle {
    private static MiniDFSCluster miniDFSCluster;
    private static MiniTezCluster miniTezCluster;
    private static Configuration conf;
    private static FileSystem fs;
    private static String TEST_ROOT_DIR;
    private static final int KEYS_PER_MAPPER = 5000;

    @BeforeClass
    public static void setupDFSCluster() throws Exception {
        conf = new Configuration();
        conf.setBoolean("dfs.namenode.edits.noeditlogchannelflush", false);
        EditLogFileOutputStream.setShouldSkipFsyncForTesting((boolean)true);
        conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
        miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
        fs = miniDFSCluster.getFileSystem();
        conf.set("fs.defaultFS", fs.getUri().toString());
        conf.setBoolean("tez.runtime.optimize.local.fetch", false);
    }

    @AfterClass
    public static void shutdownDFSCluster() {
        if (miniDFSCluster != null) {
            miniDFSCluster.shutdown();
        }
    }

    @Before
    public void setupTezCluster() throws Exception {
        conf.setInt("tez.runtime.io.sort.mb", 1);
        conf.setBoolean("tez.runtime.pipelined-shuffle.enabled", true);
        conf.setBoolean("tez.runtime.optimize.local.fetch", true);
        conf.setInt("tez.runtime.shuffle.connect.timeout", 3000);
        conf.setInt("tez.runtime.shuffle.read.timeout", 3000);
        conf.setInt("tez.runtime.shuffle.fetch.failures.limit", 2);
        miniTezCluster = new MiniTezCluster(TestPipelinedShuffle.class.getName(), 1, 1, 1);
        miniTezCluster.init(conf);
        miniTezCluster.start();
    }

    @After
    public void shutdownTezCluster() throws IOException {
        if (miniTezCluster != null) {
            miniTezCluster.stop();
        }
    }

    @Test
    public void baseTest() throws Exception {
        Configuration conf = new Configuration(miniTezCluster.getConfig());
        conf.setBoolean("tez.runtime.shuffle.use.async.http", false);
        this.test(conf);
        conf = new Configuration(miniTezCluster.getConfig());
        conf.setBoolean("tez.runtime.shuffle.use.async.http", true);
        this.test(conf);
    }

    private void test(Configuration conf) throws Exception {
        PipelinedShuffleJob pipelinedShuffle = new PipelinedShuffleJob();
        pipelinedShuffle.setConf(conf);
        String[] args = new String[]{};
        Assert.assertEquals((long)0L, (long)pipelinedShuffle.run(args));
    }

    static {
        conf = new Configuration();
        TEST_ROOT_DIR = "target/" + TestPipelinedShuffle.class.getName() + "-tmpDir";
    }

    public static class PipelinedShuffleJob
    extends Configured
    implements Tool {
        private TezConfiguration tezConf;

        public int run(String[] args) throws Exception {
            this.tezConf = new TezConfiguration(this.getConf());
            String dagName = "pipelinedShuffleTest";
            DAG dag = DAG.create((String)dagName);
            Vertex m1_Vertex = Vertex.create((String)"mapper1", (ProcessorDescriptor)ProcessorDescriptor.create((String)DataGenerator.class.getName()), (int)1);
            Vertex m2_Vertex = Vertex.create((String)"mapper2", (ProcessorDescriptor)ProcessorDescriptor.create((String)DataGenerator.class.getName()), (int)1);
            Vertex reducerVertex = Vertex.create((String)"reducer", (ProcessorDescriptor)ProcessorDescriptor.create((String)SimpleReduceProcessor.class.getName()), (int)1);
            Edge mapper1_to_reducer = Edge.create((Vertex)m1_Vertex, (Vertex)reducerVertex, (EdgeProperty)OrderedPartitionedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)Text.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration((Configuration)this.tezConf).build().createDefaultEdgeProperty());
            Edge mapper2_to_reducer = Edge.create((Vertex)m2_Vertex, (Vertex)reducerVertex, (EdgeProperty)OrderedPartitionedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)Text.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration((Configuration)this.tezConf).build().createDefaultEdgeProperty());
            dag.addVertex(m1_Vertex);
            dag.addVertex(m2_Vertex);
            dag.addVertex(reducerVertex);
            dag.addEdge(mapper1_to_reducer).addEdge(mapper2_to_reducer);
            TezClient client = TezClient.create((String)dagName, (TezConfiguration)this.tezConf);
            client.start();
            client.waitTillReady();
            DAGClient dagClient = client.submitDAG(dag);
            HashSet getOpts = Sets.newHashSet();
            getOpts.add(StatusGetOpts.GET_COUNTERS);
            DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates((Set)getOpts);
            System.out.println(dagStatus.getDAGCounters());
            TezCounters counters = dagStatus.getDAGCounters();
            Assert.assertTrue((counters.findCounter((Enum)TaskCounter.SHUFFLE_CHUNK_COUNT).getValue() > 10L ? 1 : 0) != 0);
            if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
                System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
                return -1;
            }
            return 0;
        }

        public static class SimpleReduceProcessor
        extends SimpleMRProcessor {
            public SimpleReduceProcessor(ProcessorContext context) {
                super(context);
            }

            private long readData(KeyValuesReader reader) throws IOException {
                long records = 0L;
                while (reader.next()) {
                    reader.getCurrentKey();
                    for (Object val : reader.getCurrentValues()) {
                        ++records;
                    }
                }
                return records;
            }

            public void run() throws Exception {
                Preconditions.checkArgument((this.getInputs().size() == 2 ? 1 : 0) != 0);
                long totalRecords = 0L;
                KeyValuesReader reader1 = (KeyValuesReader)((LogicalInput)this.getInputs().get("mapper1")).getReader();
                totalRecords += this.readData(reader1);
                KeyValuesReader reader2 = (KeyValuesReader)((LogicalInput)this.getInputs().get("mapper2")).getReader();
                Assert.assertEquals((long)10000L, (long)(totalRecords += this.readData(reader2)));
            }
        }

        public static class DataGenerator
        extends SimpleMRProcessor {
            public DataGenerator(ProcessorContext context) {
                super(context);
            }

            public void run() throws Exception {
                Preconditions.checkArgument((this.getInputs().size() == 0 ? 1 : 0) != 0);
                Preconditions.checkArgument((this.getOutputs().size() == 1 ? 1 : 0) != 0);
                KeyValueWriter writer = (KeyValueWriter)((LogicalOutput)this.getOutputs().get("reducer")).getWriter();
                for (int i = 0; i < 5000; ++i) {
                    writer.write((Object)new Text(RandomStringUtils.randomAlphanumeric((int)1000)), (Object)new Text(RandomStringUtils.randomAlphanumeric((int)1000)));
                }
            }
        }
    }
}

