/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManagerOnDemand;
import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.test.MiniTezCluster;
import org.apache.tez.test.TestAMRecovery;
import org.apache.tez.test.dag.MultiAttemptDAG;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestExceptionPropagation {
    private static final Logger LOG = LoggerFactory.getLogger(TestExceptionPropagation.class);
    private static TezConfiguration tezConf;
    private static Configuration conf;
    private static MiniTezCluster miniTezCluster;
    private static String TEST_ROOT_DIR;
    private static MiniDFSCluster dfsCluster;
    private static FileSystem remoteFs;
    private static TezClient tezSession;
    private static TezClient tezClient;

    private void startMiniTezCluster() {
        LOG.info("Starting mini clusters");
        try {
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).racks(null).build();
            remoteFs = dfsCluster.getFileSystem();
        }
        catch (IOException io) {
            throw new RuntimeException("problem starting mini dfs cluster", io);
        }
        miniTezCluster = new MiniTezCluster(TestExceptionPropagation.class.getName(), 1, 1, 1);
        Configuration miniTezconf = new Configuration(conf);
        miniTezconf.setInt("yarn.resourcemanager.am.max-attempts", 4);
        miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString());
        miniTezCluster.init(miniTezconf);
        miniTezCluster.start();
    }

    private void stopTezMiniCluster() {
        if (miniTezCluster != null) {
            try {
                LOG.info("Stopping MiniTezCluster");
                miniTezCluster.stop();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (dfsCluster != null) {
            try {
                LOG.info("Stopping DFSCluster");
                dfsCluster.shutdown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void startSessionClient() throws Exception {
        LOG.info("Starting session");
        tezConf = new TezConfiguration();
        tezConf.setInt("tez.dag.recovery.max.unflushed.events", 0);
        tezConf.setBoolean("tez.am.node-blacklisting.enabled", false);
        tezConf.setInt("tez.am.max.app.attempts", 4);
        tezConf.setBoolean("tez.am.one-to-one.routing.use.on-demand-routing", true);
        tezConf.setInt("tez.am.resource.memory.mb", 500);
        tezConf.set("tez.am.launch.cmd-opts", " -Xmx256m");
        tezConf.setBoolean("tez.am.mode.session", true);
        tezConf.setBoolean("tez.local.mode", true);
        tezConf.set("fs.defaultFS", "file:///");
        tezConf.setBoolean("tez.runtime.optimize.local.fetch", true);
        tezSession = TezClient.create((String)"TestExceptionPropagation", (TezConfiguration)tezConf);
        tezSession.start();
    }

    private void stopSessionClient() {
        if (tezSession != null) {
            try {
                LOG.info("Stopping Tez Session");
                tezSession.stop();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        tezSession = null;
    }

    private void startNonSessionClient() throws Exception {
        LOG.info("Starting Client");
        tezConf = new TezConfiguration(miniTezCluster.getConfig());
        tezConf.setInt("tez.dag.recovery.max.unflushed.events", 0);
        tezConf.setBoolean("tez.am.node-blacklisting.enabled", false);
        tezConf.setInt("tez.am.max.app.attempts", 4);
        tezConf.setInt("tez.am.resource.memory.mb", 500);
        tezConf.set("tez.am.launch.cmd-opts", " -Xmx256m");
        tezConf.setBoolean("tez.am.mode.session", false);
        tezClient = TezClient.create((String)"TestExceptionPropagation", (TezConfiguration)tezConf);
        tezClient.start();
    }

    private void stopNonSessionClient() {
        if (tezClient != null) {
            try {
                LOG.info("Stopping Tez Client");
                tezClient.stop();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        tezClient = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=600000L)
    public void testExceptionPropagationSession() throws Exception {
        try {
            this.startSessionClient();
            for (ExceptionLocation exLocation : ExceptionLocation.values()) {
                LOG.info("Session mode, Test for Exception from:" + exLocation.name());
                DAG dag = this.createDAG(exLocation);
                DAGClient dagClient = tezSession.submitDAG(dag);
                DAGStatus dagStatus = dagClient.waitForCompletion();
                String diagnostics = StringUtils.join((Collection)dagStatus.getDiagnostics(), (String)",");
                LOG.info("Diagnostics:" + diagnostics);
                if (exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
                    Assert.assertTrue((boolean)diagnostics.contains("Too many counters"));
                    continue;
                }
                Assert.assertTrue((boolean)diagnostics.contains(exLocation.name()));
            }
        }
        finally {
            this.stopSessionClient();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testExceptionPropagationNonSession() throws Exception {
        try {
            this.startMiniTezCluster();
            this.startNonSessionClient();
            ExceptionLocation exLocation = ExceptionLocation.EM_GetNumSourceTaskPhysicalOutputs;
            LOG.info("NonSession mode, Test for Exception from:" + exLocation.name());
            DAG dag = this.createDAG(exLocation);
            DAGClient dagClient = tezClient.submitDAG(dag);
            DAGStatus dagStatus = dagClient.waitForCompletion();
            String diagnostics = StringUtils.join((Collection)dagStatus.getDiagnostics(), (String)",");
            LOG.info("Diagnostics:" + diagnostics);
            Assert.assertTrue((boolean)diagnostics.contains(exLocation.name()));
            ApplicationId appId = tezClient.getAppMasterApplicationId();
            YarnClient yarnClient = YarnClient.createYarnClient();
            yarnClient.init((Configuration)tezConf);
            yarnClient.start();
            EnumSet<YarnApplicationState> FINAL_APPLICATION_STATES = EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED);
            ApplicationReport appReport = null;
            do {
                appReport = yarnClient.getApplicationReport(appId);
                Thread.sleep(1000L);
                LOG.info("FinalAppStatus:" + appReport.getFinalApplicationStatus());
                LOG.info("Diagnostics from appReport:" + appReport.getDiagnostics());
            } while (!FINAL_APPLICATION_STATES.contains(appReport.getYarnApplicationState()));
            Thread.sleep(1000L);
            appReport = yarnClient.getApplicationReport(appId);
            LOG.info("FinalAppStatus:" + appReport.getFinalApplicationStatus());
            LOG.info("Diagnostics from appReport:" + appReport.getDiagnostics());
            Assert.assertTrue((boolean)appReport.getDiagnostics().contains(exLocation.name()));
            Assert.assertEquals((Object)StringUtils.join((Collection)dagStatus.getDiagnostics(), (String)"\n").trim(), (Object)appReport.getDiagnostics().trim());
        }
        finally {
            this.stopNonSessionClient();
            Thread.sleep(10000L);
            this.stopTezMiniCluster();
        }
    }

    private DAG createDAG(ExceptionLocation exLocation) throws IOException {
        DAG dag = DAG.create((String)("dag_" + exLocation.name()));
        UserPayload payload = UserPayload.create((ByteBuffer)ByteBuffer.wrap(exLocation.name().getBytes()));
        Vertex v1 = Vertex.create((String)"v1", (ProcessorDescriptor)ProcessorWithException.getProcDesc(payload), (int)1);
        InputDescriptor inputDesc = InputWithException.getInputDesc(payload);
        InputInitializerDescriptor iiDesc = InputInitializerWithException.getIIDesc(payload);
        v1.addDataSource("input", DataSourceDescriptor.create((InputDescriptor)inputDesc, (InputInitializerDescriptor)iiDesc, null));
        v1.setVertexManagerPlugin(RootInputVertexManagerWithException.getVMDesc(payload));
        Vertex v2 = Vertex.create((String)"v2", (ProcessorDescriptor)TestAMRecovery.DoNothingProcessor.getProcDesc(), (int)1);
        v2.addDataSource("input2", DataSourceDescriptor.create((InputDescriptor)InputDescriptor.create((String)MultiAttemptDAG.NoOpInput.class.getName()), (InputInitializerDescriptor)InputInitializerWithException2.getIIDesc(payload), null));
        dag.addVertex(v1).addVertex(v2);
        if (exLocation.name().startsWith("EM_")) {
            dag.addEdge(Edge.create((Vertex)v1, (Vertex)v2, (EdgeProperty)EdgeProperty.create((EdgeManagerPluginDescriptor)((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CustomEdgeManager.class.getName()).setUserPayload(payload)), (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputWithException.getOutputDesc(payload), (InputDescriptor)InputWithException.getInputDesc(payload))));
        } else {
            v2.setVertexManagerPlugin(InputReadyVertexManagerWithException.getVMDesc(exLocation));
            dag.addEdge(Edge.create((Vertex)v1, (Vertex)v2, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.ONE_TO_ONE, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputWithException.getOutputDesc(payload), (InputDescriptor)InputWithException.getInputDesc(payload))));
        }
        return dag;
    }

    static {
        conf = new Configuration();
        miniTezCluster = null;
        TEST_ROOT_DIR = "target/" + TestExceptionPropagation.class.getName() + "-tmpDir";
        dfsCluster = null;
        remoteFs = null;
        tezSession = null;
        tezClient = null;
    }

    public static class CustomEdgeManager
    extends OneToOneEdgeManagerOnDemand {
        private ExceptionLocation exLocation;

        public CustomEdgeManager(EdgeManagerPluginContext context) {
            super(context);
            this.exLocation = ExceptionLocation.valueOf(new String(context.getUserPayload().deepCopyAsArray()));
        }

        public void initialize() {
            block3: {
                if (this.exLocation == ExceptionLocation.EM_Initialize) {
                    throw new RuntimeException(this.exLocation.name());
                }
                try {
                    super.initialize();
                }
                catch (TezUncheckedException e) {
                    if (e.getMessage().equals("Atleast 1 bipartite source should exist")) break block3;
                    throw e;
                }
            }
        }

        public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
            if (this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.getNumDestinationConsumerTasks(sourceTaskIndex);
        }

        public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
            if (this.exLocation == ExceptionLocation.EM_GetNumSourceTaskPhysicalOutputs) {
                throw new RuntimeException(this.exLocation.name());
            }
            LOG.info("ExLocation:" + (Object)((Object)this.exLocation));
            return super.getNumSourceTaskPhysicalOutputs(sourceTaskIndex);
        }

        public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
            if (this.exLocation == ExceptionLocation.EM_GetNumDestinationTaskPhysicalInputs) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.getNumDestinationTaskPhysicalInputs(destinationTaskIndex);
        }

        public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
            if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.routeDataMovementEventToDestination(event, sourceTaskIndex, sourceOutputIndex, destinationTaskAndInputIndices);
        }

        public void prepareForRouting() throws Exception {
            if (this.exLocation == ExceptionLocation.EM_PrepareForRouting) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.prepareForRouting();
        }

        public EdgeManagerPluginOnDemand.EventRouteMetadata routeDataMovementEventToDestination(int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.routeDataMovementEventToDestination(sourceTaskIndex, sourceOutputIndex, destinationTaskIndex);
        }

        public EdgeManagerPluginOnDemand.EventRouteMetadata routeCompositeDataMovementEventToDestination(int sourceTaskIndex, int destinationTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.routeCompositeDataMovementEventToDestination(sourceTaskIndex, destinationTaskIndex);
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex, int destinationFailedInputIndex) {
            if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.routeInputErrorEventToSource(event, destinationTaskIndex, destinationFailedInputIndex);
        }

        public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) {
            if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.routeInputErrorEventToSource(destinationTaskIndex, destinationFailedInputIndex);
        }

        public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
            super.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, destinationTaskAndInputIndices);
        }
    }

    public static class InputReadyVertexManagerWithException
    extends InputReadyVertexManager {
        private ExceptionLocation exLocation;
        private static final String Test_ExceptionLocation = "Test.ExceptionLocation";

        public InputReadyVertexManagerWithException(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            block4: {
                try {
                    super.initialize();
                }
                catch (TezUncheckedException e) {
                    if (e.getMessage().equals("Atleast 1 bipartite source should exist")) break block4;
                    throw e;
                }
            }
            try {
                Configuration conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
                this.exLocation = ExceptionLocation.valueOf(conf.get(Test_ExceptionLocation));
            }
            catch (IOException e) {
                throw new TezUncheckedException((Throwable)e);
            }
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
            if (this.exLocation == ExceptionLocation.VM_ON_SOURCETASK_COMPLETED) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onSourceTaskCompleted(attempt);
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
            if (this.exLocation == ExceptionLocation.VM_ON_VERTEXMANAGEREVENT_RECEIVED) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onVertexManagerEventReceived(vmEvent);
        }

        public static VertexManagerPluginDescriptor getVMDesc(ExceptionLocation exLocation) throws IOException {
            Configuration conf = new Configuration();
            conf.set(Test_ExceptionLocation, exLocation.name());
            UserPayload payload = TezUtils.createUserPayloadFromConf((Configuration)conf);
            return (VertexManagerPluginDescriptor)VertexManagerPluginDescriptor.create((String)InputReadyVertexManagerWithException.class.getName()).setUserPayload(payload);
        }
    }

    public static class RootInputVertexManagerWithException
    extends RootInputVertexManager {
        private ExceptionLocation exLocation;

        public RootInputVertexManagerWithException(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            super.initialize();
            this.exLocation = ExceptionLocation.valueOf(new String(this.getContext().getUserPayload().deepCopyAsArray()));
            if (this.exLocation == ExceptionLocation.VM_INITIALIZE) {
                throw new RuntimeException(this.exLocation.name());
            }
        }

        public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
            if (this.exLocation == ExceptionLocation.VM_ON_ROOTVERTEX_INITIALIZE) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onRootVertexInitialized(inputName, inputDescriptor, events);
        }

        public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
            if (this.exLocation == ExceptionLocation.VM_ON_VERTEX_STARTED) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onVertexStarted(completions);
        }

        public static VertexManagerPluginDescriptor getVMDesc(UserPayload payload) {
            return (VertexManagerPluginDescriptor)VertexManagerPluginDescriptor.create((String)RootInputVertexManagerWithException.class.getName()).setUserPayload(payload);
        }
    }

    public static class ProcessorWithException
    extends AbstractLogicalIOProcessor {
        private ExceptionLocation exLocation = ExceptionLocation.valueOf(new String(this.getContext().getUserPayload().deepCopyAsArray()));

        public ProcessorWithException(ProcessorContext context) {
            super(context);
        }

        public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
            InputWithException input = (InputWithException)inputs.get("input");
            input.start();
            input.getReader();
            OutputWithException output = (OutputWithException)outputs.get("v2");
            output.start();
            output.getWriter();
            Thread.sleep(3000L);
            if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_ERROR) {
                throw new Error(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_EXCEPTION) {
                throw new Exception(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
                for (int i = 0; i < 1201; ++i) {
                    this.getContext().getCounters().findCounter("mycounter", "counter_" + i).increment(1L);
                }
            }
        }

        public void handleEvents(List<Event> processorEvents) {
        }

        public void close() throws Exception {
            if (this.exLocation == ExceptionLocation.PROCESSOR_CLOSE_ERROR) {
                throw new Error(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.PROCESSOR_CLOSE_EXCEPTION) {
                throw new Exception(this.exLocation.name());
            }
        }

        public void initialize() throws Exception {
            if (this.exLocation == ExceptionLocation.PROCESSOR_INITIALIZE_ERROR) {
                throw new Error(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.PROCESSOR_INITIALIZE_EXCEPTION) {
                throw new Exception(this.exLocation.name());
            }
        }

        public static ProcessorDescriptor getProcDesc(UserPayload payload) {
            return (ProcessorDescriptor)ProcessorDescriptor.create((String)ProcessorWithException.class.getName()).setUserPayload(payload);
        }
    }

    public static class OutputWithException
    extends AbstractLogicalOutput {
        private ExceptionLocation exLocation = ExceptionLocation.valueOf(new String(this.getContext().getUserPayload().deepCopyAsArray()));

        public OutputWithException(OutputContext outputContext, int numPhysicalOutputs) {
            super(outputContext, numPhysicalOutputs);
        }

        public void start() throws Exception {
            if (this.exLocation == ExceptionLocation.OUTPUT_START) {
                throw new Exception(this.exLocation.name());
            }
        }

        public Writer getWriter() throws Exception {
            if (this.exLocation == ExceptionLocation.OUTPUT_GET_WRITER) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public void handleEvents(List<Event> outputEvents) {
        }

        public List<Event> close() throws Exception {
            if (this.exLocation == ExceptionLocation.OUTPUT_CLOSE) {
                throw new RuntimeException(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.VM_ON_VERTEXMANAGEREVENT_RECEIVED) {
                ArrayList<Event> events = new ArrayList<Event>();
                events.add((Event)VertexManagerEvent.create((String)"v2", (ByteBuffer)ByteBuffer.wrap(new byte[0])));
                return events;
            }
            if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
                ArrayList<Event> events = new ArrayList<Event>();
                events.add((Event)DataMovementEvent.create((int)0, (ByteBuffer)ByteBuffer.wrap(new byte[0])));
                return events;
            }
            if (this.exLocation == ExceptionLocation.II_HandleInputInitializerEvents) {
                ArrayList<Event> events = new ArrayList<Event>();
                events.add((Event)InputInitializerEvent.create((String)"v2", (String)"input2", (ByteBuffer)ByteBuffer.wrap(new byte[0])));
                return events;
            }
            return null;
        }

        public List<Event> initialize() throws Exception {
            this.getContext().requestInitialMemory(0L, null);
            if (this.exLocation == ExceptionLocation.OUTPUT_INITIALIZE) {
                throw new RuntimeException(this.exLocation.name());
            }
            return null;
        }

        public static OutputDescriptor getOutputDesc(UserPayload payload) {
            return (OutputDescriptor)OutputDescriptor.create((String)OutputWithException.class.getName()).setUserPayload(payload);
        }
    }

    public static class InputWithException
    extends AbstractLogicalInput {
        private ExceptionLocation exLocation;
        private Object condition = new Object();

        public InputWithException(InputContext inputContext, int numPhysicalInputs) {
            super(inputContext, numPhysicalInputs);
            this.exLocation = ExceptionLocation.valueOf(new String(this.getContext().getUserPayload().deepCopyAsArray()));
        }

        public void start() throws Exception {
            if (this.exLocation == ExceptionLocation.INPUT_START) {
                throw new Exception(this.exLocation.name());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Reader getReader() throws Exception {
            if (this.exLocation == ExceptionLocation.INPUT_HANDLE_EVENTS) {
                Object object = this.condition;
                synchronized (object) {
                    this.condition.wait();
                }
            }
            if (this.exLocation == ExceptionLocation.INPUT_GET_READER) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public void handleEvents(List<Event> inputEvents) throws Exception {
            if (this.exLocation == ExceptionLocation.INPUT_HANDLE_EVENTS) {
                throw new Exception(this.exLocation.name());
            }
        }

        public List<Event> close() throws Exception {
            if (this.exLocation == ExceptionLocation.INPUT_CLOSE) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public List<Event> initialize() throws Exception {
            this.getContext().requestInitialMemory(0L, null);
            if (this.exLocation == ExceptionLocation.INPUT_INITIALIZE) {
                throw new Exception(this.exLocation.name());
            }
            if (this.getContext().getSourceVertexName().equals("v1") && (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource || this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks)) {
                InputReadErrorEvent errorEvent = InputReadErrorEvent.create((String)"read error", (int)0, (int)0);
                return Lists.newArrayList((Object[])new Event[]{errorEvent});
            }
            return null;
        }

        public static InputDescriptor getInputDesc(UserPayload payload) {
            return (InputDescriptor)InputDescriptor.create((String)InputWithException.class.getName()).setUserPayload(payload);
        }
    }

    public static class InputInitializerWithException2
    extends InputInitializer {
        private ExceptionLocation exLocation;
        private Object condition = new Object();

        public InputInitializerWithException2(InputInitializerContext initializerContext) {
            super(initializerContext);
            this.exLocation = ExceptionLocation.valueOf(new String(this.getContext().getUserPayload().deepCopyAsArray()));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<Event> initialize() throws Exception {
            if (this.exLocation == ExceptionLocation.II_Initialize) {
                throw new Exception(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
                this.getContext().registerForVertexStateUpdates("v1", null);
            }
            if (this.exLocation == ExceptionLocation.II_HandleInputInitializerEvents || this.exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
                Object object = this.condition;
                synchronized (object) {
                    this.condition.wait();
                }
            }
            return null;
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
            if (this.exLocation == ExceptionLocation.II_HandleInputInitializerEvents) {
                throw new RuntimeException(this.exLocation.name());
            }
        }

        public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
            if (this.exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
                throw new Exception(this.exLocation.name());
            }
            super.onVertexStateUpdated(stateUpdate);
        }

        public static InputInitializerDescriptor getIIDesc(UserPayload payload) {
            return (InputInitializerDescriptor)InputInitializerDescriptor.create((String)InputInitializerWithException2.class.getName()).setUserPayload(payload);
        }
    }

    public static class InputInitializerWithException
    extends InputInitializer {
        private ExceptionLocation exLocation = ExceptionLocation.valueOf(new String(this.getContext().getUserPayload().deepCopyAsArray()));

        public InputInitializerWithException(InputInitializerContext initializerContext) {
            super(initializerContext);
        }

        public List<Event> initialize() throws Exception {
            ArrayList<Event> events = new ArrayList<Event>();
            events.add((Event)InputDataInformationEvent.createWithObjectPayload((int)0, null));
            return events;
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
        }

        public static InputInitializerDescriptor getIIDesc(UserPayload payload) {
            return (InputInitializerDescriptor)InputInitializerDescriptor.create((String)InputInitializerWithException.class.getName()).setUserPayload(payload);
        }
    }

    public static enum ExceptionLocation {
        INPUT_START,
        INPUT_GET_READER,
        INPUT_HANDLE_EVENTS,
        INPUT_CLOSE,
        INPUT_INITIALIZE,
        OUTPUT_START,
        OUTPUT_GET_WRITER,
        OUTPUT_CLOSE,
        OUTPUT_INITIALIZE,
        PROCESSOR_RUN_ERROR,
        PROCESSOR_CLOSE_ERROR,
        PROCESSOR_INITIALIZE_ERROR,
        PROCESSOR_RUN_EXCEPTION,
        PROCESSOR_CLOSE_EXCEPTION,
        PROCESSOR_INITIALIZE_EXCEPTION,
        PROCESSOR_COUNTER_EXCEEDED,
        VM_INITIALIZE,
        VM_ON_ROOTVERTEX_INITIALIZE,
        VM_ON_SOURCETASK_COMPLETED,
        VM_ON_VERTEX_STARTED,
        VM_ON_VERTEXMANAGEREVENT_RECEIVED,
        EM_Initialize,
        EM_GetNumDestinationTaskPhysicalInputs,
        EM_GetNumSourceTaskPhysicalOutputs,
        EM_RouteDataMovementEventToDestination,
        EM_GetNumDestinationConsumerTasks,
        EM_RouteInputErrorEventToSource,
        EM_PrepareForRouting,
        II_Initialize,
        II_HandleInputInitializerEvents,
        II_OnVertexStateUpdated;

    }
}

