/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app.job.impl;

import com.google.common.collect.ImmutableList;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.Locality;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestTaskAttempt {
    private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource";

    @BeforeClass
    public static void setupBeforeClass() {
        ResourceUtils.resetResourceTypes((Configuration)new Configuration());
    }

    @After
    public void tearDown() {
        ResourceUtils.resetResourceTypes((Configuration)new Configuration());
    }

    @Test
    public void testMRAppHistoryForMap() throws Exception {
        try (FailingAttemptsMRApp app = null;){
            app = new FailingAttemptsMRApp(1, 0);
            this.testMRAppHistory(app);
        }
    }

    @Test
    public void testMRAppHistoryForReduce() throws Exception {
        try (FailingAttemptsMRApp app = null;){
            app = new FailingAttemptsMRApp(0, 1);
            this.testMRAppHistory(app);
        }
    }

    @Test
    public void testMRAppHistoryForTAFailedInAssigned() throws Exception {
        try (FailingAttemptsDuringAssignedMRApp app = null;){
            app = new FailingAttemptsDuringAssignedMRApp(1, 0, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
            this.testTaskAttemptAssignedFailHistory(app);
            app.close();
            app = new FailingAttemptsDuringAssignedMRApp(0, 1, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
            this.testTaskAttemptAssignedFailHistory(app);
            app.close();
            app = new FailingAttemptsDuringAssignedMRApp(1, 0, TaskAttemptEventType.TA_CONTAINER_COMPLETED);
            this.testTaskAttemptAssignedFailHistory(app);
            app.close();
            app = new FailingAttemptsDuringAssignedMRApp(0, 1, TaskAttemptEventType.TA_CONTAINER_COMPLETED);
            this.testTaskAttemptAssignedFailHistory(app);
            app.close();
            app = new FailingAttemptsDuringAssignedMRApp(1, 0, TaskAttemptEventType.TA_FAILMSG);
            this.testTaskAttemptAssignedFailHistory(app);
            app.close();
            app = new FailingAttemptsDuringAssignedMRApp(0, 1, TaskAttemptEventType.TA_FAILMSG);
            this.testTaskAttemptAssignedFailHistory(app);
            app.close();
            app = new FailingAttemptsDuringAssignedMRApp(1, 0, TaskAttemptEventType.TA_FAILMSG_BY_CLIENT);
            this.testTaskAttemptAssignedFailHistory(app);
            app.close();
            app = new FailingAttemptsDuringAssignedMRApp(0, 1, TaskAttemptEventType.TA_FAILMSG_BY_CLIENT);
            this.testTaskAttemptAssignedFailHistory(app);
            app.close();
            app = new FailingAttemptsDuringAssignedMRApp(1, 0, TaskAttemptEventType.TA_KILL);
            this.testTaskAttemptAssignedKilledHistory(app);
            app.close();
            app = new FailingAttemptsDuringAssignedMRApp(0, 1, TaskAttemptEventType.TA_KILL);
            this.testTaskAttemptAssignedKilledHistory(app);
            app.close();
        }
    }

    @Test
    public void testSingleRackRequest() throws Exception {
        TaskAttemptImpl.RequestContainerTransition rct = new TaskAttemptImpl.RequestContainerTransition(false);
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        String[] hosts = new String[]{"host1", "host2", "host3"};
        JobSplit.TaskSplitMetaInfo splitInfo = new JobSplit.TaskSplitMetaInfo(hosts, 0L, 0x8000000L);
        TaskAttemptImpl mockTaskAttempt = this.createMapTaskAttemptImplForTest(eventHandler, splitInfo);
        TaskAttemptEvent mockTAEvent = (TaskAttemptEvent)Mockito.mock(TaskAttemptEvent.class);
        rct.transition(mockTaskAttempt, mockTAEvent);
        ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class);
        ((EventHandler)Mockito.verify((Object)eventHandler, (VerificationMode)Mockito.times((int)2))).handle((Event)arg.capture());
        if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
            Assert.fail((String)"Second Event not of type ContainerRequestEvent");
        }
        ContainerRequestEvent cre = (ContainerRequestEvent)arg.getAllValues().get(1);
        String[] requestedRacks = cre.getRacks();
        Assert.assertEquals((long)1L, (long)requestedRacks.length);
    }

    @Test
    public void testHostResolveAttempt() throws Exception {
        String[] requestedHosts;
        TaskAttemptImpl.RequestContainerTransition rct = new TaskAttemptImpl.RequestContainerTransition(false);
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        String[] hosts = new String[]{"192.168.1.1", "host2", "host3"};
        JobSplit.TaskSplitMetaInfo splitInfo = new JobSplit.TaskSplitMetaInfo(hosts, 0L, 0x8000000L);
        TaskAttemptImpl mockTaskAttempt = this.createMapTaskAttemptImplForTest(eventHandler, splitInfo);
        TaskAttemptImpl spyTa = (TaskAttemptImpl)Mockito.spy((Object)mockTaskAttempt);
        Mockito.when((Object)spyTa.resolveHost(hosts[0])).thenReturn((Object)"host1");
        spyTa.dataLocalHosts = spyTa.resolveHosts(splitInfo.getLocations());
        TaskAttemptEvent mockTAEvent = (TaskAttemptEvent)Mockito.mock(TaskAttemptEvent.class);
        rct.transition(spyTa, mockTAEvent);
        ((TaskAttemptImpl)Mockito.verify((Object)spyTa)).resolveHost(hosts[0]);
        ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class);
        ((EventHandler)Mockito.verify((Object)eventHandler, (VerificationMode)Mockito.times((int)2))).handle((Event)arg.capture());
        if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
            Assert.fail((String)"Second Event not of type ContainerRequestEvent");
        }
        HashMap<String, Boolean> expected = new HashMap<String, Boolean>();
        expected.put("host1", true);
        expected.put("host2", true);
        expected.put("host3", true);
        ContainerRequestEvent cre = (ContainerRequestEvent)arg.getAllValues().get(1);
        for (String h : requestedHosts = cre.getHosts()) {
            expected.remove(h);
        }
        Assert.assertEquals((long)0L, (long)expected.size());
    }

    @Test
    public void testMillisCountersUpdate() throws Exception {
        this.verifyMillisCounters(Resource.newInstance((int)1024, (int)1), 512);
        this.verifyMillisCounters(Resource.newInstance((int)2048, (int)4), 1024);
        this.verifyMillisCounters(Resource.newInstance((int)10240, (int)8), 2048);
    }

    public void verifyMillisCounters(Resource containerResource, int minContainerSize) throws Exception {
        SystemClock actualClock = SystemClock.getInstance();
        ControlledClock clock = new ControlledClock((Clock)actualClock);
        clock.setTime(10L);
        MRApp app = new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, (Clock)clock);
        app.setAllocatedContainerResource(containerResource);
        Configuration conf = new Configuration();
        conf.setInt("yarn.scheduler.minimum-allocation-mb", minContainerSize);
        app.setClusterInfo(new ClusterInfo(Resource.newInstance((int)10240, (int)1)));
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Map tasks = job.getTasks();
        Assert.assertEquals((String)"Num tasks is not correct", (long)2L, (long)tasks.size());
        Iterator taskIter = tasks.values().iterator();
        Task mTask = (Task)taskIter.next();
        app.waitForState(mTask, TaskState.RUNNING);
        Task rTask = (Task)taskIter.next();
        app.waitForState(rTask, TaskState.RUNNING);
        Map mAttempts = mTask.getAttempts();
        Assert.assertEquals((String)"Num attempts is not correct", (long)1L, (long)mAttempts.size());
        Map rAttempts = rTask.getAttempts();
        Assert.assertEquals((String)"Num attempts is not correct", (long)1L, (long)rAttempts.size());
        TaskAttempt mta = (TaskAttempt)mAttempts.values().iterator().next();
        TaskAttempt rta = (TaskAttempt)rAttempts.values().iterator().next();
        app.waitForState(mta, TaskAttemptState.RUNNING);
        app.waitForState(rta, TaskAttemptState.RUNNING);
        clock.setTime(11L);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mta.getID(), TaskAttemptEventType.TA_DONE));
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(rta.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(job, JobState.SUCCEEDED);
        Assert.assertEquals((long)mta.getFinishTime(), (long)11L);
        Assert.assertEquals((long)mta.getLaunchTime(), (long)10L);
        Assert.assertEquals((long)rta.getFinishTime(), (long)11L);
        Assert.assertEquals((long)rta.getLaunchTime(), (long)10L);
        Counters counters = job.getAllCounters();
        int memoryMb = (int)containerResource.getMemorySize();
        int vcores = containerResource.getVirtualCores();
        Assert.assertEquals((long)((int)Math.ceil((float)memoryMb / (float)minContainerSize)), (long)counters.findCounter((Enum)JobCounter.SLOTS_MILLIS_MAPS).getValue());
        Assert.assertEquals((long)((int)Math.ceil((float)memoryMb / (float)minContainerSize)), (long)counters.findCounter((Enum)JobCounter.SLOTS_MILLIS_REDUCES).getValue());
        Assert.assertEquals((long)1L, (long)counters.findCounter((Enum)JobCounter.MILLIS_MAPS).getValue());
        Assert.assertEquals((long)1L, (long)counters.findCounter((Enum)JobCounter.MILLIS_REDUCES).getValue());
        Assert.assertEquals((long)memoryMb, (long)counters.findCounter((Enum)JobCounter.MB_MILLIS_MAPS).getValue());
        Assert.assertEquals((long)memoryMb, (long)counters.findCounter((Enum)JobCounter.MB_MILLIS_REDUCES).getValue());
        Assert.assertEquals((long)vcores, (long)counters.findCounter((Enum)JobCounter.VCORES_MILLIS_MAPS).getValue());
        Assert.assertEquals((long)vcores, (long)counters.findCounter((Enum)JobCounter.VCORES_MILLIS_REDUCES).getValue());
    }

    private TaskAttemptImpl createMapTaskAttemptImplForTest(EventHandler eventHandler, JobSplit.TaskSplitMetaInfo taskSplitMetaInfo) {
        SystemClock clock = SystemClock.getInstance();
        return this.createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, (Clock)clock, new JobConf());
    }

    private TaskAttemptImpl createMapTaskAttemptImplForTest(EventHandler eventHandler, JobSplit.TaskSplitMetaInfo taskSplitMetaInfo, Clock clock, JobConf jobConf) {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)1);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, taskSplitMetaInfo, jobConf, taListener, null, null, clock, null);
        return taImpl;
    }

    private TaskAttemptImpl createReduceTaskAttemptImplForTest(EventHandler eventHandler, Clock clock, JobConf jobConf) {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)1);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.REDUCE);
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Path jobFile = (Path)Mockito.mock(Path.class);
        ReduceTaskAttemptImpl taImpl = new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, 1, jobConf, taListener, null, null, clock, null);
        return taImpl;
    }

    private void testMRAppHistory(MRApp app) throws Exception {
        Configuration conf = new Configuration();
        Job job = app.submit(conf);
        app.waitForState(job, JobState.FAILED);
        Map tasks = job.getTasks();
        Assert.assertEquals((String)"Num tasks is not correct", (long)1L, (long)tasks.size());
        Task task = (Task)tasks.values().iterator().next();
        Assert.assertEquals((String)"Task state not correct", (Object)TaskState.FAILED, (Object)task.getReport().getTaskState());
        Map attempts = ((Task)tasks.values().iterator().next()).getAttempts();
        Assert.assertEquals((String)"Num attempts is not correct", (long)4L, (long)attempts.size());
        Iterator it = attempts.values().iterator();
        TaskAttemptReport report = ((TaskAttempt)it.next()).getReport();
        Assert.assertEquals((String)"Attempt state not correct", (Object)TaskAttemptState.FAILED, (Object)report.getTaskAttemptState());
        Assert.assertEquals((String)"Diagnostic Information is not Correct", (Object)"Test Diagnostic Event", (Object)report.getDiagnosticInfo());
        report = ((TaskAttempt)it.next()).getReport();
        Assert.assertEquals((String)"Attempt state not correct", (Object)TaskAttemptState.FAILED, (Object)report.getTaskAttemptState());
    }

    private void testTaskAttemptAssignedFailHistory(FailingAttemptsDuringAssignedMRApp app) throws Exception {
        Configuration conf = new Configuration();
        Job job = app.submit(conf);
        app.waitForState(job, JobState.FAILED);
        Map tasks = job.getTasks();
        Assert.assertTrue((String)"No Ta Started JH Event", (boolean)app.getTaStartJHEvent());
        Assert.assertTrue((String)"No Ta Failed JH Event", (boolean)app.getTaFailedJHEvent());
    }

    private void testTaskAttemptAssignedKilledHistory(FailingAttemptsDuringAssignedMRApp app) throws Exception {
        Configuration conf = new Configuration();
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Map tasks = job.getTasks();
        Task task = (Task)tasks.values().iterator().next();
        app.waitForState(task, TaskState.SCHEDULED);
        Map attempts = task.getAttempts();
        TaskAttempt attempt = (TaskAttempt)attempts.values().iterator().next();
        app.waitForState(attempt, TaskAttemptState.KILLED);
        GenericTestUtils.waitFor(app::getTaStartJHEvent, (int)100, (int)800);
        GenericTestUtils.waitFor(app::getTaKilledJHEvent, (int)100, (int)800);
    }

    @Test
    public void testLaunchFailedWhileKilling() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), (Clock)SystemClock.getInstance(), null);
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
        Assert.assertFalse((boolean)eventHandler.internalError);
        Assert.assertEquals((String)"Task attempt is not assigned on the local node", (Object)Locality.NODE_LOCAL, (Object)taImpl.getLocality());
    }

    @Test
    public void testContainerCleanedWhileRunning() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Resource resource = (Resource)Mockito.mock(Resource.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        Mockito.when((Object)resource.getMemorySize()).thenReturn((Object)1024L);
        this.setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.2", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerLaunchedEvent(attemptId, 0));
        Assert.assertEquals((String)"Task attempt is not in running state", (Object)taImpl.getState(), (Object)TaskAttemptState.RUNNING);
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED));
        Assert.assertFalse((String)"InternalError occurred trying to handle TA_CONTAINER_CLEANED", (boolean)eventHandler.internalError);
        Assert.assertEquals((String)"Task attempt is not assigned on the local rack", (Object)Locality.RACK_LOCAL, (Object)taImpl.getLocality());
    }

    @Test
    public void testContainerCleanedWhileCommitting() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[0]);
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Resource resource = (Resource)Mockito.mock(Resource.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        Mockito.when((Object)resource.getMemorySize()).thenReturn((Object)1024L);
        this.setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerLaunchedEvent(attemptId, 0));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_COMMIT_PENDING));
        Assert.assertEquals((String)"Task attempt is not in commit pending state", (Object)taImpl.getState(), (Object)TaskAttemptState.COMMIT_PENDING);
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED));
        Assert.assertFalse((String)"InternalError occurred trying to handle TA_CONTAINER_CLEANED", (boolean)eventHandler.internalError);
        Assert.assertEquals((String)"Task attempt is assigned locally", (Object)Locality.OFF_SWITCH, (Object)taImpl.getLocality());
    }

    @Test
    public void testDoubleTooManyFetchFailure() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        TaskId reduceTaskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.REDUCE);
        TaskAttemptId reduceTAId = MRBuilderUtils.newTaskAttemptId((TaskId)reduceTaskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Resource resource = (Resource)Mockito.mock(Resource.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        Mockito.when((Object)resource.getMemorySize()).thenReturn((Object)1024L);
        this.setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerLaunchedEvent(attemptId, 0));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_COMPLETED));
        Assert.assertEquals((String)"Task attempt is not in succeeded state", (Object)taImpl.getState(), (Object)TaskAttemptState.SUCCEEDED);
        taImpl.handle((TaskAttemptEvent)new TaskAttemptTooManyFetchFailureEvent(attemptId, reduceTAId, "Host"));
        Assert.assertEquals((String)"Task attempt is not in FAILED state", (Object)taImpl.getState(), (Object)TaskAttemptState.FAILED);
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
        Assert.assertEquals((String)"Task attempt is not in FAILED state, still", (Object)taImpl.getState(), (Object)TaskAttemptState.FAILED);
        Assert.assertFalse((String)"InternalError occurred trying to handle TA_CONTAINER_CLEANED", (boolean)eventHandler.internalError);
    }

    @Test
    public void testAppDiognosticEventOnUnassignedTask() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Resource resource = (Resource)Mockito.mock(Resource.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        Mockito.when((Object)resource.getMemorySize()).thenReturn((Object)1024L);
        this.setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptDiagnosticsUpdateEvent(attemptId, "Task got killed"));
        Assert.assertFalse((String)"InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task", (boolean)eventHandler.internalError);
        try {
            taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL));
            Assert.assertTrue((String)"No exception on UNASSIGNED STATE KILL event", (boolean)true);
        }
        catch (Exception e) {
            Assert.assertFalse((String)"Exception not expected for UNASSIGNED STATE KILL event", (boolean)true);
        }
    }

    @Test
    public void testTooManyFetchFailureAfterKill() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Resource resource = (Resource)Mockito.mock(Resource.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        Mockito.when((Object)resource.getMemorySize()).thenReturn((Object)1024L);
        this.setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, (Token)Mockito.mock(Token.class), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerLaunchedEvent(attemptId, 0));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_COMPLETED));
        Assert.assertEquals((String)"Task attempt is not in succeeded state", (Object)taImpl.getState(), (Object)TaskAttemptState.SUCCEEDED);
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL));
        Assert.assertEquals((String)"Task attempt is not in KILLED state", (Object)taImpl.getState(), (Object)TaskAttemptState.KILLED);
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
        Assert.assertEquals((String)"Task attempt is not in KILLED state, still", (Object)taImpl.getState(), (Object)TaskAttemptState.KILLED);
        Assert.assertFalse((String)"InternalError occurred trying to handle TA_CONTAINER_CLEANED", (boolean)eventHandler.internalError);
    }

    @Test
    public void testAppDiognosticEventOnNewTask() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Resource resource = (Resource)Mockito.mock(Resource.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        Mockito.when((Object)resource.getMemorySize()).thenReturn((Object)1024L);
        this.setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle((TaskAttemptEvent)new TaskAttemptDiagnosticsUpdateEvent(attemptId, "Task got killed"));
        Assert.assertFalse((String)"InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task", (boolean)eventHandler.internalError);
    }

    @Test
    public void testFetchFailureAttemptFinishTime() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        TaskId reducetaskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.REDUCE);
        TaskAttemptId reduceTAId = MRBuilderUtils.newTaskAttemptId((TaskId)reducetaskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        this.setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, (Token)Mockito.mock(Token.class), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerLaunchedEvent(attemptId, 0));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE));
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_COMPLETED));
        Assert.assertEquals((String)"Task attempt is not in succeeded state", (Object)taImpl.getState(), (Object)TaskAttemptState.SUCCEEDED);
        Assert.assertTrue((String)"Task Attempt finish time is not greater than 0", (taImpl.getFinishTime() > 0L ? 1 : 0) != 0);
        Long finishTime = taImpl.getFinishTime();
        Thread.sleep(5L);
        taImpl.handle((TaskAttemptEvent)new TaskAttemptTooManyFetchFailureEvent(attemptId, reduceTAId, "Host"));
        Assert.assertEquals((String)"Task attempt is not in Too Many Fetch Failure state", (Object)taImpl.getState(), (Object)TaskAttemptState.FAILED);
        Assert.assertEquals((String)"After TA_TOO_MANY_FETCH_FAILURE, Task attempt finish time is not the same ", (Object)finishTime, (Object)taImpl.getFinishTime());
    }

    private void containerKillBeforeAssignment(boolean scheduleAttempt) throws Exception {
        MockEventHandler eventHandler = new MockEventHandler();
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, (Path)Mockito.mock(Path.class), 1, (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class), new JobConf(), (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class), (Token)Mockito.mock(Token.class), new Credentials(), (Clock)SystemClock.getInstance(), (AppContext)Mockito.mock(AppContext.class));
        if (scheduleAttempt) {
            taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_SCHEDULE));
        }
        taImpl.handle((TaskAttemptEvent)new TaskAttemptKillEvent(taImpl.getID(), "", true));
        Assert.assertEquals((String)"Task attempt is not in KILLED state", (Object)taImpl.getState(), (Object)TaskAttemptState.KILLED);
        Assert.assertEquals((String)"Task attempt's internal state is not KILLED", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.KILLED);
        Assert.assertFalse((String)"InternalError occurred", (boolean)eventHandler.internalError);
        TaskEvent event = eventHandler.lastTaskEvent;
        Assert.assertEquals((Object)TaskEventType.T_ATTEMPT_KILLED, (Object)event.getType());
        Assert.assertFalse((boolean)((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
    }

    @Test
    public void testContainerKillOnNew() throws Exception {
        this.containerKillBeforeAssignment(false);
    }

    @Test
    public void testContainerKillOnUnassigned() throws Exception {
        this.containerKillBeforeAssignment(true);
    }

    @Test
    public void testContainerKillAfterAssigned() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Resource resource = (Resource)Mockito.mock(Resource.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        Mockito.when((Object)resource.getMemorySize()).thenReturn((Object)1024L);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.2", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        Assert.assertEquals((String)"Task attempt is not in assinged state", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.ASSIGNED);
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL));
        Assert.assertEquals((String)"Task should be in KILL_CONTAINER_CLEANUP state", (Object)TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, (Object)taImpl.getInternalState());
    }

    @Test
    public void testContainerKillWhileRunning() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Resource resource = (Resource)Mockito.mock(Resource.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        Mockito.when((Object)resource.getMemorySize()).thenReturn((Object)1024L);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.2", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerLaunchedEvent(attemptId, 0));
        Assert.assertEquals((String)"Task attempt is not in running state", (Object)taImpl.getState(), (Object)TaskAttemptState.RUNNING);
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL));
        Assert.assertFalse((String)"InternalError occurred trying to handle TA_KILL", (boolean)eventHandler.internalError);
        Assert.assertEquals((String)"Task should be in KILL_CONTAINER_CLEANUP state", (Object)TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, (Object)taImpl.getInternalState());
    }

    @Test
    public void testContainerKillWhileCommitPending() throws Exception {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Resource resource = (Resource)Mockito.mock(Resource.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        Mockito.when((Object)resource.getMemorySize()).thenReturn((Object)1024L);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.2", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerLaunchedEvent(attemptId, 0));
        Assert.assertEquals((String)"Task attempt is not in running state", (Object)taImpl.getState(), (Object)TaskAttemptState.RUNNING);
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_COMMIT_PENDING));
        Assert.assertEquals((String)"Task should be in COMMIT_PENDING state", (Object)TaskAttemptStateInternal.COMMIT_PENDING, (Object)taImpl.getInternalState());
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL));
        Assert.assertFalse((String)"InternalError occurred trying to handle TA_KILL", (boolean)eventHandler.internalError);
        Assert.assertEquals((String)"Task should be in KILL_CONTAINER_CLEANUP state", (Object)TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, (Object)taImpl.getInternalState());
    }

    @Test
    public void testKillMapTaskWhileSuccessFinishing() throws Exception {
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptImpl taImpl = this.createTaskAttemptImpl(eventHandler);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_DONE));
        Assert.assertEquals((String)"Task attempt is not in SUCCEEDED state", (Object)taImpl.getState(), (Object)TaskAttemptState.SUCCEEDED);
        Assert.assertEquals((String)"Task attempt's internal state is not SUCCESS_FINISHING_CONTAINER", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_KILL));
        Assert.assertEquals((String)"Task attempt is not in KILLED state", (Object)taImpl.getState(), (Object)TaskAttemptState.KILLED);
        Assert.assertEquals((String)"Task attempt's internal state is not KILL_CONTAINER_CLEANUP", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_CONTAINER_CLEANED));
        Assert.assertEquals((String)"Task attempt's internal state is not KILL_TASK_CLEANUP", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.KILL_TASK_CLEANUP);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_CLEANUP_DONE));
        Assert.assertEquals((String)"Task attempt is not in KILLED state", (Object)taImpl.getState(), (Object)TaskAttemptState.KILLED);
        Assert.assertFalse((String)"InternalError occurred", (boolean)eventHandler.internalError);
    }

    @Test
    public void testKillMapTaskAfterSuccess() throws Exception {
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptImpl taImpl = this.createTaskAttemptImpl(eventHandler);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_DONE));
        Assert.assertEquals((String)"Task attempt is not in SUCCEEDED state", (Object)taImpl.getState(), (Object)TaskAttemptState.SUCCEEDED);
        Assert.assertEquals((String)"Task attempt's internal state is not SUCCESS_FINISHING_CONTAINER", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_CONTAINER_CLEANED));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptKillEvent(taImpl.getID(), "", true));
        Assert.assertEquals((String)"Task attempt is not in KILLED state", (Object)taImpl.getState(), (Object)TaskAttemptState.KILLED);
        Assert.assertEquals((String)"Task attempt's internal state is not KILLED", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.KILLED);
        Assert.assertFalse((String)"InternalError occurred", (boolean)eventHandler.internalError);
        TaskEvent event = eventHandler.lastTaskEvent;
        Assert.assertEquals((Object)TaskEventType.T_ATTEMPT_KILLED, (Object)event.getType());
        Assert.assertTrue((boolean)((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
    }

    @Test
    public void testKillMapTaskWhileFailFinishing() throws Exception {
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptImpl taImpl = this.createTaskAttemptImpl(eventHandler);
        taImpl.handle((TaskAttemptEvent)new TaskAttemptFailEvent(taImpl.getID()));
        Assert.assertEquals((String)"Task attempt is not in FAILED state", (Object)taImpl.getState(), (Object)TaskAttemptState.FAILED);
        Assert.assertEquals((String)"Task attempt's internal state is not FAIL_FINISHING_CONTAINER", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_KILL));
        Assert.assertEquals((String)"Task attempt is not in RUNNING state", (Object)taImpl.getState(), (Object)TaskAttemptState.FAILED);
        Assert.assertEquals((String)"Task attempt's internal state is not FAIL_FINISHING_CONTAINER", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_TIMED_OUT));
        Assert.assertEquals((String)"Task attempt's internal state is not FAIL_CONTAINER_CLEANUP", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_CONTAINER_CLEANED));
        Assert.assertEquals((String)"Task attempt's internal state is not FAIL_TASK_CLEANUP", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_CLEANUP_DONE));
        Assert.assertEquals((String)"Task attempt is not in KILLED state", (Object)taImpl.getState(), (Object)TaskAttemptState.FAILED);
        Assert.assertFalse((String)"InternalError occurred", (boolean)eventHandler.internalError);
    }

    @Test
    public void testFailMapTaskByClient() throws Exception {
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptImpl taImpl = this.createTaskAttemptImpl(eventHandler);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_FAILMSG_BY_CLIENT));
        Assert.assertEquals((String)"Task attempt is not in RUNNING state", (Object)taImpl.getState(), (Object)TaskAttemptState.FAILED);
        Assert.assertEquals((String)"Task attempt's internal state is not FAIL_CONTAINER_CLEANUP", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_CONTAINER_CLEANED));
        Assert.assertEquals((String)"Task attempt's internal state is not FAIL_TASK_CLEANUP", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_CLEANUP_DONE));
        Assert.assertEquals((String)"Task attempt is not in KILLED state", (Object)taImpl.getState(), (Object)TaskAttemptState.FAILED);
        Assert.assertFalse((String)"InternalError occurred", (boolean)eventHandler.internalError);
    }

    @Test
    public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception {
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptImpl taImpl = this.createTaskAttemptImpl(eventHandler);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_DONE));
        Assert.assertEquals((String)"Task attempt is not in RUNNING state", (Object)taImpl.getState(), (Object)TaskAttemptState.SUCCEEDED);
        Assert.assertEquals((String)"Task attempt's internal state is not SUCCESS_FINISHING_CONTAINER", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
        taImpl.handle((TaskAttemptEvent)new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(), "Task got updated"));
        Assert.assertEquals((String)"Task attempt is not in RUNNING state", (Object)taImpl.getState(), (Object)TaskAttemptState.SUCCEEDED);
        Assert.assertEquals((String)"Task attempt's internal state is not SUCCESS_FINISHING_CONTAINER", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
        Assert.assertFalse((String)"InternalError occurred", (boolean)eventHandler.internalError);
    }

    @Test
    public void testTimeoutWhileSuccessFinishing() throws Exception {
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptImpl taImpl = this.createTaskAttemptImpl(eventHandler);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_DONE));
        Assert.assertEquals((String)"Task attempt is not in RUNNING state", (Object)taImpl.getState(), (Object)TaskAttemptState.SUCCEEDED);
        Assert.assertEquals((String)"Task attempt's internal state is not SUCCESS_FINISHING_CONTAINER", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_TIMED_OUT));
        Assert.assertEquals((String)"Task attempt is not in RUNNING state", (Object)taImpl.getState(), (Object)TaskAttemptState.SUCCEEDED);
        Assert.assertEquals((String)"Task attempt's internal state is not SUCCESS_CONTAINER_CLEANUP", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP);
        Assert.assertFalse((String)"InternalError occurred", (boolean)eventHandler.internalError);
    }

    @Test
    public void testTimeoutWhileFailFinishing() throws Exception {
        MockEventHandler eventHandler = new MockEventHandler();
        TaskAttemptImpl taImpl = this.createTaskAttemptImpl(eventHandler);
        taImpl.handle((TaskAttemptEvent)new TaskAttemptFailEvent(taImpl.getID()));
        Assert.assertEquals((String)"Task attempt is not in RUNNING state", (Object)taImpl.getState(), (Object)TaskAttemptState.FAILED);
        Assert.assertEquals((String)"Task attempt's internal state is not FAIL_FINISHING_CONTAINER", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_TIMED_OUT));
        Assert.assertEquals((String)"Task attempt's internal state is not FAIL_CONTAINER_CLEANUP", (Object)taImpl.getInternalState(), (Object)TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
        Assert.assertFalse((String)"InternalError occurred", (boolean)eventHandler.internalError);
    }

    @Test
    public void testMapperCustomResourceTypes() {
        this.initResourceTypes();
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        JobSplit.TaskSplitMetaInfo taskSplitMetaInfo = new JobSplit.TaskSplitMetaInfo();
        SystemClock clock = SystemClock.getInstance();
        JobConf jobConf = new JobConf();
        jobConf.setLong("mapreduce.map.resource.a-custom-resource", 7L);
        TaskAttemptImpl taImpl = this.createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, (Clock)clock, jobConf);
        ResourceInformation resourceInfo = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getResourceInformation(CUSTOM_RESOURCE_NAME);
        Assert.assertEquals((String)"Expecting the default unit (G)", (Object)"G", (Object)resourceInfo.getUnits());
        Assert.assertEquals((long)7L, (long)resourceInfo.getValue());
    }

    @Test
    public void testReducerCustomResourceTypes() {
        this.initResourceTypes();
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        SystemClock clock = SystemClock.getInstance();
        JobConf jobConf = new JobConf();
        jobConf.set("mapreduce.reduce.resource.a-custom-resource", "3m");
        TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
        ResourceInformation resourceInfo = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getResourceInformation(CUSTOM_RESOURCE_NAME);
        Assert.assertEquals((String)"Expecting the specified unit (m)", (Object)"m", (Object)resourceInfo.getUnits());
        Assert.assertEquals((long)3L, (long)resourceInfo.getValue());
    }

    @Test
    public void testReducerMemoryRequestViaMapreduceReduceMemoryMb() {
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        SystemClock clock = SystemClock.getInstance();
        JobConf jobConf = new JobConf();
        jobConf.setInt("mapreduce.reduce.memory.mb", 2048);
        TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
        long memorySize = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getMemorySize();
        Assert.assertEquals((long)2048L, (long)memorySize);
    }

    @Test
    public void testReducerMemoryRequestViaMapreduceReduceResourceMemory() {
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        SystemClock clock = SystemClock.getInstance();
        JobConf jobConf = new JobConf();
        jobConf.set("mapreduce.reduce.resource.memory", "2 Gi");
        TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
        long memorySize = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getMemorySize();
        Assert.assertEquals((long)2048L, (long)memorySize);
    }

    @Test
    public void testReducerMemoryRequestDefaultMemory() {
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        SystemClock clock = SystemClock.getInstance();
        TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, new JobConf());
        long memorySize = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getMemorySize();
        Assert.assertEquals((long)1024L, (long)memorySize);
    }

    @Test
    public void testReducerMemoryRequestWithoutUnits() {
        SystemClock clock = SystemClock.getInstance();
        for (String memoryResourceName : ImmutableList.of((Object)"memory", (Object)"memory-mb")) {
            EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
            JobConf jobConf = new JobConf();
            jobConf.setInt("mapreduce.reduce.resource." + memoryResourceName, 2048);
            TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
            long memorySize = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getMemorySize();
            Assert.assertEquals((long)2048L, (long)memorySize);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducerMemoryRequestOverriding() {
        for (String memoryName : ImmutableList.of((Object)"memory", (Object)"memory-mb")) {
            TestAppender testAppender = new TestAppender();
            Logger logger = Logger.getLogger(TaskAttemptImpl.class);
            try {
                logger.addAppender((Appender)testAppender);
                EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
                SystemClock clock = SystemClock.getInstance();
                JobConf jobConf = new JobConf();
                jobConf.set("mapreduce.reduce.resource." + memoryName, "3Gi");
                jobConf.setInt("mapreduce.reduce.memory.mb", 2048);
                TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
                long memorySize = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getMemorySize();
                Assert.assertEquals((long)3072L, (long)memorySize);
                Assert.assertTrue((boolean)testAppender.getLogEvents().stream().anyMatch(e -> e.getLevel() == Level.WARN && ("Configuration mapreduce.reduce.resource." + memoryName + "=3Gi is overriding the mapreduce.reduce.memory.mb=2048 configuration").equals(e.getMessage())));
            }
            finally {
                logger.removeAppender((Appender)testAppender);
            }
        }
    }

    @Test(expected=IllegalArgumentException.class)
    public void testReducerMemoryRequestMultipleName() {
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        SystemClock clock = SystemClock.getInstance();
        JobConf jobConf = new JobConf();
        for (String memoryName : ImmutableList.of((Object)"memory", (Object)"memory-mb")) {
            jobConf.set("mapreduce.reduce.resource." + memoryName, "3Gi");
        }
        this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
    }

    @Test
    public void testReducerCpuRequestViaMapreduceReduceCpuVcores() {
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        SystemClock clock = SystemClock.getInstance();
        JobConf jobConf = new JobConf();
        jobConf.setInt("mapreduce.reduce.cpu.vcores", 3);
        TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
        int vCores = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getVirtualCores();
        Assert.assertEquals((long)3L, (long)vCores);
    }

    @Test
    public void testReducerCpuRequestViaMapreduceReduceResourceVcores() {
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        SystemClock clock = SystemClock.getInstance();
        JobConf jobConf = new JobConf();
        jobConf.set("mapreduce.reduce.resource.vcores", "5");
        TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
        int vCores = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getVirtualCores();
        Assert.assertEquals((long)5L, (long)vCores);
    }

    @Test
    public void testReducerCpuRequestDefaultMemory() {
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        SystemClock clock = SystemClock.getInstance();
        TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, new JobConf());
        int vCores = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getVirtualCores();
        Assert.assertEquals((long)1L, (long)vCores);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducerCpuRequestOverriding() {
        TestAppender testAppender = new TestAppender();
        Logger logger = Logger.getLogger(TaskAttemptImpl.class);
        try {
            logger.addAppender((Appender)testAppender);
            EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
            SystemClock clock = SystemClock.getInstance();
            JobConf jobConf = new JobConf();
            jobConf.set("mapreduce.reduce.resource.vcores", "7");
            jobConf.setInt("mapreduce.reduce.cpu.vcores", 9);
            TaskAttemptImpl taImpl = this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
            long vCores = this.getResourceInfoFromContainerRequest(taImpl, eventHandler).getVirtualCores();
            Assert.assertEquals((long)7L, (long)vCores);
            Assert.assertTrue((boolean)testAppender.getLogEvents().stream().anyMatch(e -> e.getLevel() == Level.WARN && "Configuration mapreduce.reduce.resource.vcores=7 is overriding the mapreduce.reduce.cpu.vcores=9 configuration".equals(e.getMessage())));
        }
        finally {
            logger.removeAppender((Appender)testAppender);
        }
    }

    private Resource getResourceInfoFromContainerRequest(TaskAttemptImpl taImpl, EventHandler eventHandler) {
        taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_SCHEDULE));
        Assert.assertEquals((String)"Task attempt is not in STARTING state", (Object)taImpl.getState(), (Object)TaskAttemptState.STARTING);
        ArgumentCaptor captor = ArgumentCaptor.forClass(Event.class);
        ((EventHandler)Mockito.verify((Object)eventHandler, (VerificationMode)Mockito.times((int)2))).handle((Event)captor.capture());
        ArrayList<ContainerRequestEvent> containerRequestEvents = new ArrayList<ContainerRequestEvent>();
        for (Event e : captor.getAllValues()) {
            if (!(e instanceof ContainerRequestEvent)) continue;
            containerRequestEvents.add((ContainerRequestEvent)e);
        }
        Assert.assertEquals((String)"Expected one ContainerRequestEvent after scheduling task attempt", (long)1L, (long)containerRequestEvents.size());
        return ((ContainerRequestEvent)containerRequestEvents.get(0)).getCapability();
    }

    @Test(expected=IllegalArgumentException.class)
    public void testReducerCustomResourceTypeWithInvalidUnit() {
        this.initResourceTypes();
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        SystemClock clock = SystemClock.getInstance();
        JobConf jobConf = new JobConf();
        jobConf.set("mapreduce.reduce.resource.a-custom-resource", "3z");
        this.createReduceTaskAttemptImplForTest(eventHandler, (Clock)clock, jobConf);
    }

    private void initResourceTypes() {
        Configuration conf = new Configuration();
        conf.set("yarn.resourcemanager.configuration.provider-class", CustomResourceTypesConfigurationProvider.class.getName());
        ResourceUtils.resetResourceTypes((Configuration)conf);
    }

    private void setupTaskAttemptFinishingMonitor(EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
        TaskAttemptFinishingMonitor taskAttemptFinishingMonitor = new TaskAttemptFinishingMonitor(eventHandler);
        taskAttemptFinishingMonitor.init((Configuration)jobConf);
        Mockito.when((Object)appCtx.getTaskAttemptFinishingMonitor()).thenReturn((Object)taskAttemptFinishingMonitor);
    }

    private TaskAttemptImpl createTaskAttemptImpl(MockEventHandler eventHandler) {
        ApplicationId appId = ApplicationId.newInstance((long)1L, (int)2);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)1);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)1, (TaskType)TaskType.MAP);
        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        Path jobFile = (Path)Mockito.mock(Path.class);
        TaskAttemptListener taListener = (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class);
        Mockito.when((Object)taListener.getAddress()).thenReturn((Object)new InetSocketAddress("localhost", 0));
        JobConf jobConf = new JobConf();
        jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        jobConf.setBoolean("fs.file.impl.disable.cache", true);
        jobConf.set("mapreduce.map.env", "");
        jobConf.set("mapreduce.job.application.attempt.id", "10");
        JobSplit.TaskSplitMetaInfo splits = (JobSplit.TaskSplitMetaInfo)Mockito.mock(JobSplit.TaskSplitMetaInfo.class);
        Mockito.when((Object)splits.getLocations()).thenReturn((Object)new String[]{"127.0.0.1"});
        AppContext appCtx = (AppContext)Mockito.mock(AppContext.class);
        ClusterInfo clusterInfo = (ClusterInfo)Mockito.mock(ClusterInfo.class);
        Mockito.when((Object)appCtx.getClusterInfo()).thenReturn((Object)clusterInfo);
        this.setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
        MapTaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, (EventHandler)eventHandler, jobFile, 1, splits, jobConf, taListener, (Token)Mockito.mock(Token.class), new Credentials(), (Clock)SystemClock.getInstance(), appCtx);
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newInstance((ApplicationAttemptId)appAttemptId, (int)3);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerAssignedEvent(attemptId, container, (Map)Mockito.mock(Map.class)));
        taImpl.handle((TaskAttemptEvent)new TaskAttemptContainerLaunchedEvent(attemptId, 0));
        return taImpl;
    }

    public static class MockEventHandler
    implements EventHandler {
        public boolean internalError;
        public TaskEvent lastTaskEvent;

        public void handle(Event event) {
            JobEvent je;
            if (event instanceof TaskEvent) {
                this.lastTaskEvent = (TaskEvent)event;
            }
            if (event instanceof JobEvent && JobEventType.INTERNAL_ERROR == (je = (JobEvent)event).getType()) {
                this.internalError = true;
            }
        }
    }

    static class FailingAttemptsDuringAssignedMRApp
    extends MRApp {
        TaskAttemptEventType sendFailEvent;
        private boolean receiveTaStartJHEvent = false;
        private boolean receiveTaFailedJHEvent = false;
        private boolean receiveTaKilledJHEvent = false;

        FailingAttemptsDuringAssignedMRApp(int maps, int reduces, TaskAttemptEventType event) {
            super(maps, reduces, true, "FailingAttemptsMRApp", true);
            this.sendFailEvent = event;
        }

        @Override
        protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) {
        }

        @Override
        protected void attemptLaunched(TaskAttemptId attemptID) {
            this.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(attemptID, this.sendFailEvent));
        }

        public boolean getTaStartJHEvent() {
            return this.receiveTaStartJHEvent;
        }

        public boolean getTaFailedJHEvent() {
            return this.receiveTaFailedJHEvent;
        }

        public boolean getTaKilledJHEvent() {
            return this.receiveTaKilledJHEvent;
        }

        @Override
        protected EventHandler<JobHistoryEvent> createJobHistoryHandler(AppContext context) {
            return new EventHandler<JobHistoryEvent>(){

                public void handle(JobHistoryEvent event) {
                    if (event.getType() == EventType.MAP_ATTEMPT_FAILED) {
                        receiveTaFailedJHEvent = true;
                    } else if (event.getType() == EventType.MAP_ATTEMPT_KILLED) {
                        receiveTaKilledJHEvent = true;
                    } else if (event.getType() == EventType.MAP_ATTEMPT_STARTED) {
                        receiveTaStartJHEvent = true;
                    } else if (event.getType() == EventType.REDUCE_ATTEMPT_FAILED) {
                        receiveTaFailedJHEvent = true;
                    } else if (event.getType() == EventType.REDUCE_ATTEMPT_KILLED) {
                        receiveTaKilledJHEvent = true;
                    } else if (event.getType() == EventType.REDUCE_ATTEMPT_STARTED) {
                        receiveTaStartJHEvent = true;
                    }
                }
            };
        }
    }

    static class FailingAttemptsMRApp
    extends MRApp {
        FailingAttemptsMRApp(int maps, int reduces) {
            super(maps, reduces, true, "FailingAttemptsMRApp", true);
        }

        @Override
        protected void attemptLaunched(TaskAttemptId attemptID) {
            this.getContext().getEventHandler().handle((Event)new TaskAttemptDiagnosticsUpdateEvent(attemptID, "Test Diagnostic Event"));
            this.getContext().getEventHandler().handle((Event)new TaskAttemptFailEvent(attemptID));
        }

        @Override
        protected EventHandler<JobHistoryEvent> createJobHistoryHandler(AppContext context) {
            return new EventHandler<JobHistoryEvent>(){

                public void handle(JobHistoryEvent event) {
                    if (event.getType() == EventType.MAP_ATTEMPT_FAILED) {
                        TaskAttemptUnsuccessfulCompletion datum = (TaskAttemptUnsuccessfulCompletion)event.getHistoryEvent().getDatum();
                        Assert.assertEquals((String)"Diagnostic Information is not Correct", (Object)"Test Diagnostic Event", (Object)datum.get(8).toString());
                    }
                }
            };
        }
    }

    private static class TestAppender
    extends AppenderSkeleton {
        private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<LoggingEvent>();

        private TestAppender() {
        }

        public boolean requiresLayout() {
            return false;
        }

        public void close() {
        }

        protected void append(LoggingEvent arg0) {
            this.logEvents.add(arg0);
        }

        private List<LoggingEvent> getLogEvents() {
            return this.logEvents;
        }
    }

    private static class CustomResourceTypesConfigurationProvider
    extends LocalConfigurationProvider {
        private CustomResourceTypesConfigurationProvider() {
        }

        public InputStream getConfigurationInputStream(Configuration bootstrapConf, String name) throws YarnException, IOException {
            if ("resource-types.xml".equals(name)) {
                return new ByteArrayInputStream("<configuration>\n <property>\n   <name>yarn.resource-types</name>\n   <value>a-custom-resource</value>\n </property>\n <property>\n   <name>yarn.resource-types.a-custom-resource.units</name>\n   <value>G</value>\n </property>\n</configuration>\n".getBytes());
            }
            return super.getConfigurationInputStream(bootstrapConf, name);
        }
    }

    public static class StubbedFS
    extends RawLocalFileSystem {
        public FileStatus getFileStatus(Path f) throws IOException {
            return new FileStatus(1L, false, 1, 1L, 1L, f);
        }
    }
}

