/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.app.RecoveryParser;
import org.apache.tez.dag.app.dag.impl.ImmediateStartVertexManager;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.apache.tez.test.MiniTezCluster;
import org.apache.tez.test.TestInput;
import org.apache.tez.test.TestOutput;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestAMRecovery {
    private static final Logger LOG = LoggerFactory.getLogger(TestAMRecovery.class);
    private static Configuration conf = new Configuration();
    private static TezConfiguration tezConf;
    private static int MAX_AM_ATTEMPT;
    private static MiniTezCluster miniTezCluster;
    private static String TEST_ROOT_DIR;
    private static MiniDFSCluster dfsCluster;
    private static TezClient tezSession;
    private static FileSystem remoteFs;
    private static String FAIL_ON_PARTIAL_FINISHED;
    private static String FAIL_ON_ATTEMPT;

    @BeforeClass
    public static void beforeClass() throws Exception {
        LOG.info("Starting mini clusters");
        try {
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).racks(null).build();
            remoteFs = dfsCluster.getFileSystem();
        }
        catch (IOException io) {
            throw new RuntimeException("problem starting mini dfs cluster", io);
        }
        if (miniTezCluster == null) {
            miniTezCluster = new MiniTezCluster(TestAMRecovery.class.getName(), 1, 1, 1);
            Configuration miniTezconf = new Configuration(conf);
            miniTezconf.setInt("yarn.resourcemanager.am.max-attempts", MAX_AM_ATTEMPT);
            miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString());
            conf.setLong("tez.am.sleep.time.before.exit.millis", 500L);
            miniTezCluster.init(miniTezconf);
            miniTezCluster.start();
        }
    }

    @AfterClass
    public static void afterClass() throws InterruptedException {
        if (tezSession != null) {
            try {
                LOG.info("Stopping Tez Session");
                tezSession.stop();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (miniTezCluster != null) {
            try {
                LOG.info("Stopping MiniTezCluster");
                miniTezCluster.stop();
                miniTezCluster = null;
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (dfsCluster != null) {
            try {
                LOG.info("Stopping DFSCluster");
                dfsCluster.shutdown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Before
    public void setup() throws Exception {
        LOG.info("Starting session");
        Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String.valueOf(new Random().nextInt(100000))));
        TezClientUtils.ensureStagingDirExists((Configuration)conf, (Path)remoteStagingDir);
        tezConf = new TezConfiguration(miniTezCluster.getConfig());
        tezConf.setInt("tez.dag.recovery.max.unflushed.events", 0);
        tezConf.set("tez.am.log.level", "INFO");
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        tezConf.setBoolean("tez.am.node-blacklisting.enabled", false);
        tezConf.setInt("tez.am.max.app.attempts", MAX_AM_ATTEMPT);
        tezConf.setInt("tez.am.resource.memory.mb", 500);
        tezConf.set("tez.am.launch.cmd-opts", " -Xmx256m");
        tezConf.setBoolean("tez.am.mode.session", true);
        tezConf.setBoolean("tez.am.staging.scratch-data.auto-delete", false);
        tezConf.setBoolean("tez.test.recovery.drain_event", true);
        tezConf.setInt("ipc.client.connect.max.retries", 0);
        tezConf.setInt("ipc.client.connect.max.retries.on.timeouts", 0);
        tezConf.setInt("ipc.client.connect.timeout", 1000);
        tezSession = TezClient.create((String)"TestDAGRecovery", (TezConfiguration)tezConf);
        tezSession.start();
    }

    @After
    public void teardown() throws InterruptedException {
        if (tezSession != null) {
            try {
                LOG.info("Stopping Tez Session");
                tezSession.stop();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        tezSession = null;
    }

    @Test(timeout=120000L)
    public void testVertexPartiallyFinished_Broadcast() throws Exception {
        DAG dag = this.createDAG("VertexPartiallyFinished_Broadcast", ControlledImmediateStartVertexManager.class, EdgeProperty.DataMovementType.BROADCAST, true);
        TezCounters counters = this.runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
        Assert.assertEquals((long)4L, (long)counters.findCounter((Enum)DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)TestCounter.Counter_1).getValue());
        List<HistoryEvent> historyEvents1 = this.readRecoveryLog(1);
        List<HistoryEvent> historyEvents2 = this.readRecoveryLog(2);
        this.printHistoryEvents(historyEvents1, 1);
        this.printHistoryEvents(historyEvents1, 2);
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
        Assert.assertEquals((long)0L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
    }

    @Test(timeout=120000L)
    public void testVertexCompletelyFinished_Broadcast() throws Exception {
        DAG dag = this.createDAG("VertexCompletelyFinished_Broadcast", ControlledImmediateStartVertexManager.class, EdgeProperty.DataMovementType.BROADCAST, false);
        TezCounters counters = this.runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
        Assert.assertEquals((long)4L, (long)counters.findCounter((Enum)DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)TestCounter.Counter_1).getValue());
        List<HistoryEvent> historyEvents1 = this.readRecoveryLog(1);
        List<HistoryEvent> historyEvents2 = this.readRecoveryLog(2);
        this.printHistoryEvents(historyEvents1, 1);
        this.printHistoryEvents(historyEvents1, 2);
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
    }

    @Test(timeout=120000L)
    public void testVertexPartialFinished_One2One() throws Exception {
        DAG dag = this.createDAG("VertexPartialFinished_One2One", ControlledInputReadyVertexManager.class, EdgeProperty.DataMovementType.ONE_TO_ONE, true);
        TezCounters counters = this.runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
        Assert.assertEquals((long)4L, (long)counters.findCounter((Enum)DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)TestCounter.Counter_1).getValue());
        List<HistoryEvent> historyEvents1 = this.readRecoveryLog(1);
        List<HistoryEvent> historyEvents2 = this.readRecoveryLog(2);
        this.printHistoryEvents(historyEvents1, 1);
        this.printHistoryEvents(historyEvents1, 2);
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
        Assert.assertEquals((long)0L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
    }

    @Test(timeout=120000L)
    public void testVertexCompletelyFinished_One2One() throws Exception {
        DAG dag = this.createDAG("VertexCompletelyFinished_One2One", ControlledInputReadyVertexManager.class, EdgeProperty.DataMovementType.ONE_TO_ONE, false);
        TezCounters counters = this.runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
        Assert.assertEquals((long)4L, (long)counters.findCounter((Enum)DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)TestCounter.Counter_1).getValue());
        List<HistoryEvent> historyEvents1 = this.readRecoveryLog(1);
        List<HistoryEvent> historyEvents2 = this.readRecoveryLog(2);
        this.printHistoryEvents(historyEvents1, 1);
        this.printHistoryEvents(historyEvents1, 2);
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
    }

    @Test(timeout=120000L)
    public void testVertexPartiallyFinished_ScatterGather() throws Exception {
        DAG dag = this.createDAG("VertexPartiallyFinished_ScatterGather", ControlledShuffleVertexManager.class, EdgeProperty.DataMovementType.SCATTER_GATHER, true);
        TezCounters counters = this.runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
        Assert.assertEquals((long)4L, (long)counters.findCounter((Enum)DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)TestCounter.Counter_1).getValue());
        List<HistoryEvent> historyEvents1 = this.readRecoveryLog(1);
        List<HistoryEvent> historyEvents2 = this.readRecoveryLog(2);
        this.printHistoryEvents(historyEvents1, 1);
        this.printHistoryEvents(historyEvents1, 2);
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
        Assert.assertEquals((long)0L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
    }

    @Test(timeout=120000L)
    public void testVertexCompletelyFinished_ScatterGather() throws Exception {
        DAG dag = this.createDAG("VertexCompletelyFinished_ScatterGather", ControlledShuffleVertexManager.class, EdgeProperty.DataMovementType.SCATTER_GATHER, false);
        TezCounters counters = this.runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
        Assert.assertEquals((long)4L, (long)counters.findCounter((Enum)DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
        Assert.assertEquals((long)2L, (long)counters.findCounter((Enum)TestCounter.Counter_1).getValue());
        TezCounter outputCounter = counters.findCounter("TestOutput", "TestOutput");
        TezCounter inputCounter = counters.findCounter("TestInput", "TestInput");
        Assert.assertTrue((outputCounter.getValue() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((inputCounter.getValue() > 0L ? 1 : 0) != 0);
        List<HistoryEvent> historyEvents1 = this.readRecoveryLog(1);
        List<HistoryEvent> historyEvents2 = this.readRecoveryLog(2);
        this.printHistoryEvents(historyEvents1, 1);
        this.printHistoryEvents(historyEvents1, 2);
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
        Assert.assertEquals((long)1L, (long)this.findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
    }

    @Test(timeout=600000L)
    public void testHighMaxAttempt() throws Exception {
        Random rand = new Random();
        tezConf.set(FAIL_ON_ATTEMPT, rand.nextInt(MAX_AM_ATTEMPT) + "");
        LOG.info("Set FAIL_ON_ATTEMPT=" + tezConf.get(FAIL_ON_ATTEMPT));
        DAG dag = this.createDAG("HighMaxAttempt", FailOnAttemptVertexManager.class, EdgeProperty.DataMovementType.SCATTER_GATHER, false);
        this.runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
    }

    TezCounters runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
        tezSession.waitTillReady();
        DAGClient dagClient = tezSession.submitDAG(dag);
        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(EnumSet.of(StatusGetOpts.GET_COUNTERS));
        Assert.assertEquals((Object)finalState, (Object)dagStatus.getState());
        return dagStatus.getDAGCounters();
    }

    private DAG createDAG(String dagName, Class vertexManagerClass, EdgeProperty.DataMovementType dmType, boolean failOnParitialCompleted) throws IOException {
        if (failOnParitialCompleted) {
            tezConf.set(FAIL_ON_PARTIAL_FINISHED, "true");
        } else {
            tezConf.set(FAIL_ON_PARTIAL_FINISHED, "false");
        }
        DAG dag = DAG.create((String)dagName);
        UserPayload payload = UserPayload.create(null);
        Vertex v1 = Vertex.create((String)"v1", (ProcessorDescriptor)MyProcessor.getProcDesc(), (int)2);
        v1.setVertexManagerPlugin((VertexManagerPluginDescriptor)VertexManagerPluginDescriptor.create((String)ScheduleControlledVertexManager.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)tezConf)));
        Vertex v2 = Vertex.create((String)"v2", (ProcessorDescriptor)DoNothingProcessor.getProcDesc(), (int)2);
        v2.setVertexManagerPlugin((VertexManagerPluginDescriptor)VertexManagerPluginDescriptor.create((String)vertexManagerClass.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)tezConf)));
        dag.addVertex(v1).addVertex(v2);
        dag.addEdge(Edge.create((Vertex)v1, (Vertex)v2, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)dmType, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)TestOutput.getOutputDesc(payload), (InputDescriptor)TestInput.getInputDesc(payload))));
        return dag;
    }

    private List<TaskAttemptFinishedEvent> findTaskAttemptFinishedEvent(List<HistoryEvent> historyEvents, int vertexId, int taskId) {
        ArrayList<TaskAttemptFinishedEvent> resultEvents = new ArrayList<TaskAttemptFinishedEvent>();
        for (HistoryEvent historyEvent : historyEvents) {
            TaskAttemptFinishedEvent taFinishedEvent;
            if (historyEvent.getEventType() != HistoryEventType.TASK_ATTEMPT_FINISHED || (taFinishedEvent = (TaskAttemptFinishedEvent)historyEvent).getTaskAttemptID().getTaskID().getVertexID().getId() != vertexId || taFinishedEvent.getTaskAttemptID().getTaskID().getId() != taskId) continue;
            resultEvents.add(taFinishedEvent);
        }
        return resultEvents;
    }

    private List<HistoryEvent> readRecoveryLog(int attemptNum) throws IOException {
        ApplicationId appId = tezSession.getAppMasterApplicationId();
        Path tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath((Configuration)tezConf, (String)appId.toString());
        Path recoveryDataDir = TezCommonUtils.getRecoveryPath((Path)tezSystemStagingDir, (Configuration)tezConf);
        FileSystem fs = tezSystemStagingDir.getFileSystem((Configuration)tezConf);
        ArrayList<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
        for (int i = 1; i <= attemptNum; ++i) {
            Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath((Path)recoveryDataDir, (int)i);
            Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir, appId.toString().replace("application", "dag") + "_1" + ".recovery");
            if (!fs.exists(recoveryFilePath)) continue;
            LOG.info("Read recovery file:" + recoveryFilePath);
            historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile((FSDataInputStream)fs.open(recoveryFilePath)));
        }
        return historyEvents;
    }

    private void printHistoryEvents(List<HistoryEvent> historyEvents, int attemptId) {
        LOG.info("RecoveryLogs from attempt:" + attemptId);
        for (HistoryEvent historyEvent : historyEvents) {
            LOG.info("Parsed event from recovery stream, eventType=" + historyEvent.getEventType() + ", event=" + historyEvent);
        }
        LOG.info("");
    }

    static {
        MAX_AM_ATTEMPT = 10;
        miniTezCluster = null;
        TEST_ROOT_DIR = "target/" + TestAMRecovery.class.getName() + "-tmpDir";
        dfsCluster = null;
        tezSession = null;
        remoteFs = null;
        FAIL_ON_PARTIAL_FINISHED = "FAIL_ON_PARTIAL_COMPLETED";
        FAIL_ON_ATTEMPT = "FAIL_ON_ATTEMPT";
    }

    public static class DoNothingProcessor
    extends SimpleProcessor {
        public DoNothingProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            Thread.sleep(3000L);
        }

        public static ProcessorDescriptor getProcDesc() {
            return ProcessorDescriptor.create((String)DoNothingProcessor.class.getName());
        }
    }

    public static class MyProcessor
    extends SimpleProcessor {
        public MyProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            this.getContext().getCounters().findCounter((Enum)TestCounter.Counter_1).increment(1L);
        }

        public static ProcessorDescriptor getProcDesc() {
            return ProcessorDescriptor.create((String)MyProcessor.class.getName());
        }
    }

    public static enum TestCounter {
        Counter_1;

    }

    public static class FailOnAttemptVertexManager
    extends ShuffleVertexManager {
        private Configuration conf;

        public FailOnAttemptVertexManager(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            super.initialize();
            try {
                this.conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
            int curAttempt = this.getContext().getDAGAttemptNumber();
            super.onSourceTaskCompleted(attempt);
            int failOnAttempt = this.conf.getInt(FAIL_ON_ATTEMPT, 1);
            LOG.info("failOnAttempt:" + failOnAttempt);
            LOG.info("curAttempt:" + curAttempt);
            if (curAttempt < failOnAttempt) {
                System.exit(-1);
            }
        }
    }

    public static class ScheduleControlledVertexManager
    extends VertexManagerPlugin {
        private Configuration conf;

        public ScheduleControlledVertexManager(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            try {
                this.conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception {
            if (this.getContext().getDAGAttemptNumber() == 1 && this.conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
                this.getContext().scheduleTasks((List)Lists.newArrayList((Object[])new VertexManagerPluginContext.ScheduleTaskRequest[]{VertexManagerPluginContext.ScheduleTaskRequest.create((int)0, null)}));
                return;
            }
            int taskNum = this.getContext().getVertexNumTasks(this.getContext().getVertexName());
            ArrayList<VertexManagerPluginContext.ScheduleTaskRequest> taskWithLocationHints = new ArrayList<VertexManagerPluginContext.ScheduleTaskRequest>();
            for (int i = 0; i < taskNum; ++i) {
                taskWithLocationHints.add(VertexManagerPluginContext.ScheduleTaskRequest.create((int)i, null));
            }
            this.getContext().scheduleTasks(taskWithLocationHints);
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
        }

        public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) throws Exception {
        }
    }

    public static class ControlledImmediateStartVertexManager
    extends ImmediateStartVertexManager {
        private Configuration conf;
        private int completedTaskNum = 0;

        public ControlledImmediateStartVertexManager(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            super.initialize();
            try {
                this.conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
            super.onSourceTaskCompleted(attempt);
            ++this.completedTaskNum;
            if (this.getContext().getDAGAttemptNumber() == 1) {
                if (this.conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
                    if (this.completedTaskNum == 1) {
                        System.exit(-1);
                    }
                } else if (this.completedTaskNum == this.getContext().getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
                    System.exit(-1);
                }
            }
        }
    }

    public static class ControlledShuffleVertexManager
    extends ShuffleVertexManager {
        private Configuration conf;
        private int completedTaskNum = 0;

        public ControlledShuffleVertexManager(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            super.initialize();
            try {
                this.conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
            super.onSourceTaskCompleted(attempt);
            ++this.completedTaskNum;
            if (this.getContext().getDAGAttemptNumber() == 1) {
                if (this.conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
                    if (this.completedTaskNum == 1) {
                        System.exit(-1);
                    }
                } else if (this.completedTaskNum == this.getContext().getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
                    System.exit(-1);
                }
            }
        }
    }

    public static class ControlledInputReadyVertexManager
    extends InputReadyVertexManager {
        private Configuration conf;
        private int completedTaskNum = 0;

        public ControlledInputReadyVertexManager(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            super.initialize();
            try {
                this.conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
            super.onSourceTaskCompleted(attempt);
            ++this.completedTaskNum;
            if (this.getContext().getDAGAttemptNumber() == 1) {
                if (this.conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
                    if (this.completedTaskNum == 1) {
                        System.exit(-1);
                    }
                } else if (this.completedTaskNum == this.getContext().getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
                    System.exit(-1);
                }
            }
        }
    }
}

