/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Assert;
import org.junit.Test;

public class TestFetchFailure {
    @Test
    public void testFetchFailure() throws Exception {
        MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true);
        Configuration conf = new Configuration();
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"Num tasks not correct", (long)2L, (long)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask = (Task)it.next();
        Task reduceTask = (Task)it.next();
        app.waitForState(mapTask, TaskState.RUNNING);
        TaskAttempt mapAttempt1 = (TaskAttempt)mapTask.getAttempts().values().iterator().next();
        app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (long)1L, (long)events.length);
        Assert.assertEquals((String)"Event status not correct", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[0].getStatus());
        app.waitForState(reduceTask, TaskState.RUNNING);
        TaskAttempt reduceAttempt = (TaskAttempt)reduceTask.getAttempts().values().iterator().next();
        app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        app.waitForState(mapTask, TaskState.RUNNING);
        Assert.assertEquals((String)"Map TaskAttempt state not correct", (Object)TaskAttemptState.FAILED, (Object)mapAttempt1.getState());
        Assert.assertEquals((String)"Num attempts in Map Task not correct", (long)2L, (long)mapTask.getAttempts().size());
        Iterator atIt = mapTask.getAttempts().values().iterator();
        atIt.next();
        TaskAttempt mapAttempt2 = (TaskAttempt)atIt.next();
        app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt2.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduceAttempt.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(job, JobState.SUCCEEDED);
        Assert.assertEquals((String)"Event status not correct", (Object)TaskAttemptCompletionEventStatus.OBSOLETE, (Object)events[0].getStatus());
        events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (long)4L, (long)events.length);
        Assert.assertEquals((String)"Event map attempt id not correct", (Object)mapAttempt1.getID(), (Object)events[0].getAttemptId());
        Assert.assertEquals((String)"Event map attempt id not correct", (Object)mapAttempt1.getID(), (Object)events[1].getAttemptId());
        Assert.assertEquals((String)"Event map attempt id not correct", (Object)mapAttempt2.getID(), (Object)events[2].getAttemptId());
        Assert.assertEquals((String)"Event redude attempt id not correct", (Object)reduceAttempt.getID(), (Object)events[3].getAttemptId());
        Assert.assertEquals((String)"Event status not correct for map attempt1", (Object)TaskAttemptCompletionEventStatus.OBSOLETE, (Object)events[0].getStatus());
        Assert.assertEquals((String)"Event status not correct for map attempt1", (Object)TaskAttemptCompletionEventStatus.FAILED, (Object)events[1].getStatus());
        Assert.assertEquals((String)"Event status not correct for map attempt2", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[2].getStatus());
        Assert.assertEquals((String)"Event status not correct for reduce attempt1", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[3].getStatus());
        Object[] mapEvents = job.getMapAttemptCompletionEvents(0, 2);
        TaskCompletionEvent[] convertedEvents = TypeConverter.fromYarn((TaskAttemptCompletionEvent[])events);
        Assert.assertEquals((String)"Incorrect number of map events", (long)2L, (long)mapEvents.length);
        Assert.assertArrayEquals((String)"Unexpected map events", (Object[])Arrays.copyOfRange(convertedEvents, 0, 2), (Object[])mapEvents);
        mapEvents = job.getMapAttemptCompletionEvents(2, 200);
        Assert.assertEquals((String)"Incorrect number of map events", (long)1L, (long)mapEvents.length);
        Assert.assertEquals((String)"Unexpected map event", (Object)convertedEvents[2], (Object)mapEvents[0]);
    }

    @Test
    public void testFetchFailureWithRecovery() throws Exception {
        int runCount = 0;
        MRAppWithHistory app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), true, ++runCount);
        Configuration conf = new Configuration();
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"Num tasks not correct", (long)2L, (long)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask = (Task)it.next();
        Task reduceTask = (Task)it.next();
        app.waitForState(mapTask, TaskState.RUNNING);
        TaskAttempt mapAttempt1 = (TaskAttempt)mapTask.getAttempts().values().iterator().next();
        app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (long)1L, (long)events.length);
        Assert.assertEquals((String)"Event status not correct", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[0].getStatus());
        app.waitForState(reduceTask, TaskState.RUNNING);
        TaskAttempt reduceAttempt = (TaskAttempt)reduceTask.getAttempts().values().iterator().next();
        app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        app.waitForState(mapTask, TaskState.RUNNING);
        app.stop();
        app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), false, ++runCount);
        conf = new Configuration();
        conf.setBoolean("yarn.app.mapreduce.am.job.recovery.enable", true);
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"Num tasks not correct", (long)2L, (long)job.getTasks().size());
        it = job.getTasks().values().iterator();
        mapTask = (Task)it.next();
        reduceTask = (Task)it.next();
        app.waitForState(mapTask, TaskState.RUNNING);
        mapAttempt1 = (TaskAttempt)mapTask.getAttempts().values().iterator().next();
        app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        reduceAttempt = (TaskAttempt)reduceTask.getAttempts().values().iterator().next();
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduceAttempt.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(job, JobState.SUCCEEDED);
        events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (long)2L, (long)events.length);
    }

    @Test
    public void testFetchFailureMultipleReduces() throws Exception {
        MRApp app = new MRApp(1, 3, false, this.getClass().getName(), true);
        Configuration conf = new Configuration();
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"Num tasks not correct", (long)4L, (long)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask = (Task)it.next();
        Task reduceTask = (Task)it.next();
        Task reduceTask2 = (Task)it.next();
        Task reduceTask3 = (Task)it.next();
        app.waitForState(mapTask, TaskState.RUNNING);
        TaskAttempt mapAttempt1 = (TaskAttempt)mapTask.getAttempts().values().iterator().next();
        app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (long)1L, (long)events.length);
        Assert.assertEquals((String)"Event status not correct", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[0].getStatus());
        app.waitForState(reduceTask, TaskState.RUNNING);
        app.waitForState(reduceTask2, TaskState.RUNNING);
        app.waitForState(reduceTask3, TaskState.RUNNING);
        TaskAttempt reduceAttempt = (TaskAttempt)reduceTask.getAttempts().values().iterator().next();
        app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
        this.updateStatus(app, reduceAttempt, Phase.SHUFFLE);
        TaskAttempt reduceAttempt2 = (TaskAttempt)reduceTask2.getAttempts().values().iterator().next();
        app.waitForState(reduceAttempt2, TaskAttemptState.RUNNING);
        this.updateStatus(app, reduceAttempt2, Phase.SHUFFLE);
        TaskAttempt reduceAttempt3 = (TaskAttempt)reduceTask3.getAttempts().values().iterator().next();
        app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING);
        this.updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        Assert.assertEquals((Object)TaskState.SUCCEEDED, (Object)mapTask.getState());
        this.updateStatus(app, reduceAttempt2, Phase.REDUCE);
        this.updateStatus(app, reduceAttempt3, Phase.REDUCE);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        app.waitForState(mapTask, TaskState.RUNNING);
        Assert.assertEquals((String)"Map TaskAttempt state not correct", (Object)TaskAttemptState.FAILED, (Object)mapAttempt1.getState());
        Assert.assertEquals((String)"Num attempts in Map Task not correct", (long)2L, (long)mapTask.getAttempts().size());
        Iterator atIt = mapTask.getAttempts().values().iterator();
        atIt.next();
        TaskAttempt mapAttempt2 = (TaskAttempt)atIt.next();
        app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt2.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduceAttempt.getID(), TaskAttemptEventType.TA_DONE));
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduceAttempt2.getID(), TaskAttemptEventType.TA_DONE));
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduceAttempt3.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(job, JobState.SUCCEEDED);
        Assert.assertEquals((String)"Event status not correct", (Object)TaskAttemptCompletionEventStatus.OBSOLETE, (Object)events[0].getStatus());
        events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (long)6L, (long)events.length);
        Assert.assertEquals((String)"Event map attempt id not correct", (Object)mapAttempt1.getID(), (Object)events[0].getAttemptId());
        Assert.assertEquals((String)"Event map attempt id not correct", (Object)mapAttempt1.getID(), (Object)events[1].getAttemptId());
        Assert.assertEquals((String)"Event map attempt id not correct", (Object)mapAttempt2.getID(), (Object)events[2].getAttemptId());
        Assert.assertEquals((String)"Event reduce attempt id not correct", (Object)reduceAttempt.getID(), (Object)events[3].getAttemptId());
        Assert.assertEquals((String)"Event status not correct for map attempt1", (Object)TaskAttemptCompletionEventStatus.OBSOLETE, (Object)events[0].getStatus());
        Assert.assertEquals((String)"Event status not correct for map attempt1", (Object)TaskAttemptCompletionEventStatus.FAILED, (Object)events[1].getStatus());
        Assert.assertEquals((String)"Event status not correct for map attempt2", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[2].getStatus());
        Assert.assertEquals((String)"Event status not correct for reduce attempt1", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[3].getStatus());
        Object[] mapEvents = job.getMapAttemptCompletionEvents(0, 2);
        TaskCompletionEvent[] convertedEvents = TypeConverter.fromYarn((TaskAttemptCompletionEvent[])events);
        Assert.assertEquals((String)"Incorrect number of map events", (long)2L, (long)mapEvents.length);
        Assert.assertArrayEquals((String)"Unexpected map events", (Object[])Arrays.copyOfRange(convertedEvents, 0, 2), (Object[])mapEvents);
        mapEvents = job.getMapAttemptCompletionEvents(2, 200);
        Assert.assertEquals((String)"Incorrect number of map events", (long)1L, (long)mapEvents.length);
        Assert.assertEquals((String)"Unexpected map event", (Object)convertedEvents[2], (Object)mapEvents[0]);
    }

    private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
        TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
        status.counters = new Counters();
        status.fetchFailedMaps = new ArrayList();
        status.id = attempt.getID();
        status.mapFinishTime = 0L;
        status.phase = phase;
        status.progress = 0.5f;
        status.shuffleFinishTime = 0L;
        status.sortFinishTime = 0L;
        status.stateString = "OK";
        status.taskState = attempt.getState();
        TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(), status);
        app.getContext().getEventHandler().handle((Event)event);
    }

    private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, TaskAttempt mapAttempt) {
        app.getContext().getEventHandler().handle((Event)new JobTaskAttemptFetchFailureEvent(reduceAttempt.getID(), Arrays.asList(mapAttempt.getID())));
    }

    static class MRAppWithHistory
    extends MRApp {
        public MRAppWithHistory(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) {
            super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
        }

        @Override
        protected EventHandler<JobHistoryEvent> createJobHistoryHandler(AppContext context) {
            JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, this.getStartCount());
            return eventHandler;
        }
    }
}

