/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app;

import java.util.Arrays;
import java.util.Iterator;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.TestFetchFailure;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.yarn.event.Event;
import org.junit.Test;

public class TestFetchFailure {
    @Test
    public void testFetchFailure() throws Exception {
        MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true);
        Configuration conf = new Configuration();
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"Num tasks not correct", (int)2, (int)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask = (Task)it.next();
        Task reduceTask = (Task)it.next();
        app.waitForState(mapTask, TaskState.RUNNING);
        TaskAttempt mapAttempt1 = (TaskAttempt)mapTask.getAttempts().values().iterator().next();
        app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (int)1, (int)events.length);
        Assert.assertEquals((String)"Event status not correct", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[0].getStatus());
        app.waitForState(reduceTask, TaskState.RUNNING);
        TaskAttempt reduceAttempt = (TaskAttempt)reduceTask.getAttempts().values().iterator().next();
        app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        this.sendFetchFailure(app, reduceAttempt, mapAttempt1);
        app.waitForState(mapTask, TaskState.RUNNING);
        Assert.assertEquals((String)"Map TaskAttempt state not correct", (Object)TaskAttemptState.FAILED, (Object)mapAttempt1.getState());
        Assert.assertEquals((String)"Num attempts in Map Task not correct", (int)2, (int)mapTask.getAttempts().size());
        Iterator atIt = mapTask.getAttempts().values().iterator();
        atIt.next();
        TaskAttempt mapAttempt2 = (TaskAttempt)atIt.next();
        app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt2.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduceAttempt.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(job, JobState.SUCCEEDED);
        Assert.assertEquals((String)"Event status not correct", (Object)TaskAttemptCompletionEventStatus.OBSOLETE, (Object)events[0].getStatus());
        events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (int)4, (int)events.length);
        Assert.assertEquals((String)"Event map attempt id not correct", (Object)mapAttempt1.getID(), (Object)events[0].getAttemptId());
        Assert.assertEquals((String)"Event map attempt id not correct", (Object)mapAttempt1.getID(), (Object)events[1].getAttemptId());
        Assert.assertEquals((String)"Event map attempt id not correct", (Object)mapAttempt2.getID(), (Object)events[2].getAttemptId());
        Assert.assertEquals((String)"Event redude attempt id not correct", (Object)reduceAttempt.getID(), (Object)events[3].getAttemptId());
        Assert.assertEquals((String)"Event status not correct for map attempt1", (Object)TaskAttemptCompletionEventStatus.OBSOLETE, (Object)events[0].getStatus());
        Assert.assertEquals((String)"Event status not correct for map attempt1", (Object)TaskAttemptCompletionEventStatus.FAILED, (Object)events[1].getStatus());
        Assert.assertEquals((String)"Event status not correct for map attempt2", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[2].getStatus());
        Assert.assertEquals((String)"Event status not correct for reduce attempt1", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[3].getStatus());
    }

    @Test
    public void testFetchFailureWithRecovery() throws Exception {
        int runCount = 0;
        MRAppWithHistory app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), true, ++runCount);
        Configuration conf = new Configuration();
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        Job job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"Num tasks not correct", (int)2, (int)job.getTasks().size());
        Iterator it = job.getTasks().values().iterator();
        Task mapTask = (Task)it.next();
        Task reduceTask = (Task)it.next();
        app.waitForState(mapTask, TaskState.RUNNING);
        TaskAttempt mapAttempt1 = (TaskAttempt)mapTask.getAttempts().values().iterator().next();
        app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (int)1, (int)events.length);
        Assert.assertEquals((String)"Event status not correct", (Object)TaskAttemptCompletionEventStatus.SUCCEEDED, (Object)events[0].getStatus());
        app.waitForState(reduceTask, TaskState.RUNNING);
        TaskAttempt reduceAttempt = (TaskAttempt)reduceTask.getAttempts().values().iterator().next();
        app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
        this.sendFetchFailure((MRApp)app, reduceAttempt, mapAttempt1);
        this.sendFetchFailure((MRApp)app, reduceAttempt, mapAttempt1);
        this.sendFetchFailure((MRApp)app, reduceAttempt, mapAttempt1);
        app.waitForState(mapTask, TaskState.RUNNING);
        app.stop();
        app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), false, ++runCount);
        conf = new Configuration();
        conf.setBoolean("yarn.app.mapreduce.am.job.recovery.enable", true);
        conf.setBoolean("mapreduce.job.ubertask.enable", false);
        job = app.submit(conf);
        app.waitForState(job, JobState.RUNNING);
        Assert.assertEquals((String)"Num tasks not correct", (int)2, (int)job.getTasks().size());
        it = job.getTasks().values().iterator();
        mapTask = (Task)it.next();
        reduceTask = (Task)it.next();
        app.waitForState(mapTask, TaskState.RUNNING);
        mapAttempt1 = (TaskAttempt)mapTask.getAttempts().values().iterator().next();
        app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(mapAttempt1.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(mapTask, TaskState.SUCCEEDED);
        reduceAttempt = (TaskAttempt)reduceTask.getAttempts().values().iterator().next();
        app.getContext().getEventHandler().handle((Event)new TaskAttemptEvent(reduceAttempt.getID(), TaskAttemptEventType.TA_DONE));
        app.waitForState(job, JobState.SUCCEEDED);
        events = job.getTaskAttemptCompletionEvents(0, 100);
        Assert.assertEquals((String)"Num completion events not correct", (int)2, (int)events.length);
    }

    private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, TaskAttempt mapAttempt) {
        app.getContext().getEventHandler().handle((Event)new JobTaskAttemptFetchFailureEvent(reduceAttempt.getID(), Arrays.asList(mapAttempt.getID())));
    }
}

