/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.AbstractCounters;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGReport;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrder;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.OutputCommitter;

public class DAGImpl
implements DAG,
EventHandler<DAGEvent> {
    private static final Log LOG = LogFactory.getLog(DAGImpl.class);
    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
    private final TezDAGID dagId;
    private final Clock clock;
    private final Lock readLock;
    private final Lock writeLock;
    private final String dagName;
    private final TaskAttemptListener taskAttemptListener;
    private final TaskHeartbeatHandler taskHeartbeatHandler;
    private final Object tasksSyncHandle = new Object();
    private volatile boolean committedOrAborted = false;
    private volatile boolean allOutputsCommitted = false;
    boolean commitAllOutputsOnSuccess = true;
    @VisibleForTesting
    DAGScheduler dagScheduler;
    private final EventHandler eventHandler;
    private final String userName;
    private final AppContext appContext;
    private final UserGroupInformation dagUGI;
    private final ACLManager aclManager;
    volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
    private Map<String, Edge> edges = new HashMap<String, Edge>();
    private TezCounters dagCounters = new TezCounters();
    private Object fullCountersLock = new Object();
    private TezCounters fullCounters = null;
    private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
    public final Configuration conf;
    private final DAGProtos.DAGPlan jobPlan;
    private final List<String> diagnostics = new ArrayList<String>();
    boolean recoveryInitEventSeen = false;
    boolean recoveryStartEventSeen = false;
    private TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
    private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
    private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
    private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION = new CounterUpdateTransition();
    private static final DAGSchedulerUpdateTransition DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition();
    protected static final StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent> stateMachineFactory = new StateMachineFactory((Enum)DAGState.NEW).addTransition((Enum)DAGState.NEW, (Enum)DAGState.NEW, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.NEW, EnumSet.of(DAGState.NEW, new DAGState[]{DAGState.INITED, DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED, DAGState.ERROR, DAGState.TERMINATING}), (Enum)DAGEventType.DAG_RECOVER, (MultipleArcTransition)new RecoverTransition()).addTransition((Enum)DAGState.NEW, (Enum)DAGState.NEW, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.NEW, EnumSet.of(DAGState.INITED, DAGState.FAILED), (Enum)DAGEventType.DAG_INIT, (MultipleArcTransition)new InitTransition()).addTransition((Enum)DAGState.NEW, (Enum)DAGState.KILLED, (Enum)DAGEventType.DAG_KILL, (SingleArcTransition)new KillNewJobTransition()).addTransition((Enum)DAGState.NEW, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.INITED, (Enum)DAGState.INITED, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.INITED, (Enum)DAGState.INITED, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.INITED, (Enum)DAGState.RUNNING, (Enum)DAGEventType.DAG_START, (SingleArcTransition)new StartTransition()).addTransition((Enum)DAGState.INITED, (Enum)DAGState.KILLED, (Enum)DAGEventType.DAG_KILL, (SingleArcTransition)new KillInitedJobTransition()).addTransition((Enum)DAGState.INITED, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.RUNNING, EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.TERMINATING, DAGState.FAILED), (Enum)DAGEventType.DAG_VERTEX_COMPLETED, (MultipleArcTransition)new VertexCompletedTransition()).addTransition((Enum)DAGState.RUNNING, EnumSet.of(DAGState.RUNNING, DAGState.TERMINATING), (Enum)DAGEventType.DAG_VERTEX_RERUNNING, (MultipleArcTransition)new VertexReRunningTransition()).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.TERMINATING, (Enum)DAGEventType.DAG_KILL, (SingleArcTransition)new DAGKilledTransition()).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.RUNNING, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.RUNNING, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.RUNNING, (Enum)DAGEventType.DAG_SCHEDULER_UPDATE, (SingleArcTransition)DAG_SCHEDULER_UPDATE_TRANSITION).addTransition((Enum)DAGState.RUNNING, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.TERMINATING, EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED, DAGState.ERROR), (Enum)DAGEventType.DAG_VERTEX_COMPLETED, (MultipleArcTransition)new VertexCompletedTransition()).addTransition((Enum)DAGState.TERMINATING, (Enum)DAGState.TERMINATING, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.TERMINATING, (Enum)DAGState.TERMINATING, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.TERMINATING, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.TERMINATING, (Enum)DAGState.TERMINATING, EnumSet.of(DAGEventType.DAG_KILL, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE)).addTransition((Enum)DAGState.SUCCEEDED, (Enum)DAGState.SUCCEEDED, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.SUCCEEDED, (Enum)DAGState.SUCCEEDED, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.SUCCEEDED, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.SUCCEEDED, (Enum)DAGState.SUCCEEDED, EnumSet.of(DAGEventType.DAG_KILL, DAGEventType.DAG_SCHEDULER_UPDATE, DAGEventType.DAG_VERTEX_COMPLETED)).addTransition((Enum)DAGState.FAILED, (Enum)DAGState.FAILED, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.FAILED, (Enum)DAGState.FAILED, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.FAILED, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.FAILED, (Enum)DAGState.FAILED, EnumSet.of(DAGEventType.DAG_KILL, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE, DAGEventType.DAG_VERTEX_COMPLETED)).addTransition((Enum)DAGState.KILLED, (Enum)DAGState.KILLED, (Enum)DAGEventType.DAG_DIAGNOSTIC_UPDATE, (SingleArcTransition)DIAGNOSTIC_UPDATE_TRANSITION).addTransition((Enum)DAGState.KILLED, (Enum)DAGState.KILLED, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).addTransition((Enum)DAGState.KILLED, (Enum)DAGState.ERROR, (Enum)DAGEventType.INTERNAL_ERROR, (SingleArcTransition)INTERNAL_ERROR_TRANSITION).addTransition((Enum)DAGState.KILLED, (Enum)DAGState.KILLED, EnumSet.of(DAGEventType.DAG_KILL, DAGEventType.DAG_START, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE, DAGEventType.DAG_VERTEX_COMPLETED)).addTransition((Enum)DAGState.ERROR, (Enum)DAGState.ERROR, EnumSet.of(DAGEventType.DAG_KILL, new DAGEventType[]{DAGEventType.DAG_INIT, DAGEventType.DAG_START, DAGEventType.DAG_VERTEX_COMPLETED, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE, DAGEventType.DAG_COMPLETED, DAGEventType.DAG_DIAGNOSTIC_UPDATE, DAGEventType.INTERNAL_ERROR, DAGEventType.DAG_COUNTER_UPDATE})).addTransition((Enum)DAGState.ERROR, (Enum)DAGState.ERROR, (Enum)DAGEventType.DAG_COUNTER_UPDATE, (SingleArcTransition)COUNTER_UPDATE_TRANSITION).installTopology();
    private final StateMachine<DAGState, DAGEventType, DAGEvent> stateMachine;
    @VisibleForTesting
    int numCompletedVertices = 0;
    private int numVertices;
    private int numSuccessfulVertices = 0;
    private int numFailedVertices = 0;
    private int numKilledVertices = 0;
    private boolean isUber = false;
    private DAGTerminationCause terminationCause;
    private Credentials credentials;
    private long initTime;
    private long startTime;
    private long finishTime;
    Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
    Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
    private DAGState recoveredState = DAGState.NEW;
    private boolean recoveryCommitInProgress = false;
    Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
    LinkedHashMap<String, Vertex> vertexMap = new LinkedHashMap();

    public DAGImpl(TezDAGID dagId, Configuration conf, DAGProtos.DAGPlan jobPlan, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Credentials dagCredentials, Clock clock, String appUserName, TaskHeartbeatHandler thh, AppContext appContext) {
        this.dagId = dagId;
        this.jobPlan = jobPlan;
        this.conf = conf;
        this.dagName = jobPlan.getName() != null ? jobPlan.getName() : "<missing app name>";
        this.userName = appUserName;
        this.clock = clock;
        this.appContext = appContext;
        this.taskAttemptListener = taskAttemptListener;
        this.taskHeartbeatHandler = thh;
        this.eventHandler = eventHandler;
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        this.credentials = dagCredentials;
        if (this.credentials == null) {
            try {
                this.dagUGI = UserGroupInformation.getCurrentUser();
            }
            catch (IOException e) {
                throw new TezUncheckedException("Failed to set UGI for dag based on currentUser", (Throwable)e);
            }
        } else {
            this.dagUGI = UserGroupInformation.createRemoteUser((String)this.userName);
            this.dagUGI.addCredentials(this.credentials);
        }
        this.aclManager = new ACLManager(appContext.getAMACLManager(), this.dagUGI.getShortUserName(), this.conf);
        this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(conf);
        this.stateMachine = stateMachineFactory.make((Object)this);
    }

    protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() {
        return this.stateMachine;
    }

    @Override
    public TezDAGID getID() {
        return this.dagId;
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public DAGProtos.DAGPlan getJobPlan() {
        return this.jobPlan;
    }

    EventHandler getEventHandler() {
        return this.eventHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Vertex getVertex(TezVertexID vertexID) {
        this.readLock.lock();
        try {
            Vertex vertex = this.vertices.get(vertexID);
            return vertex;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public boolean isUber() {
        return this.isUber;
    }

    @Override
    public Credentials getCredentials() {
        return this.credentials;
    }

    @Override
    public UserGroupInformation getDagUGI() {
        return this.dagUGI;
    }

    @Override
    public DAGState restoreFromEvent(HistoryEvent historyEvent) {
        switch (historyEvent.getEventType()) {
            case DAG_INITIALIZED: {
                this.recoveredState = this.initializeDAG((DAGInitializedEvent)historyEvent);
                this.recoveryInitEventSeen = true;
                return this.recoveredState;
            }
            case DAG_STARTED: {
                if (!this.recoveryInitEventSeen) {
                    throw new RuntimeException("Started Event seen but no Init Event was encountered earlier");
                }
                this.recoveryStartEventSeen = true;
                this.startTime = ((DAGStartedEvent)historyEvent).getStartTime();
                this.recoveredState = DAGState.RUNNING;
                return this.recoveredState;
            }
            case DAG_COMMIT_STARTED: {
                this.recoveryCommitInProgress = true;
                return this.recoveredState;
            }
            case VERTEX_GROUP_COMMIT_STARTED: {
                VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = (VertexGroupCommitStartedEvent)historyEvent;
                this.recoveredGroupCommits.put(vertexGroupCommitStartedEvent.getVertexGroupName(), false);
                return this.recoveredState;
            }
            case VERTEX_GROUP_COMMIT_FINISHED: {
                VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = (VertexGroupCommitFinishedEvent)historyEvent;
                this.recoveredGroupCommits.put(vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
                return this.recoveredState;
            }
            case DAG_FINISHED: {
                this.recoveryCommitInProgress = false;
                DAGFinishedEvent finishedEvent = (DAGFinishedEvent)historyEvent;
                this.finishTime = finishedEvent.getFinishTime();
                this.recoveredState = finishedEvent.getState();
                return this.recoveredState;
            }
        }
        throw new RuntimeException("Unexpected event received for restoring state, eventType=" + (Object)((Object)historyEvent.getEventType()));
    }

    @Override
    public ACLManager getACLManager() {
        return this.aclManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TezCounters getAllCounters() {
        this.readLock.lock();
        try {
            DAGState state = this.getInternalState();
            if (state == DAGState.ERROR || state == DAGState.FAILED || state == DAGState.KILLED || state == DAGState.SUCCEEDED) {
                this.mayBeConstructFinalFullCounters();
                TezCounters tezCounters = this.fullCounters;
                return tezCounters;
            }
            TezCounters counters = new TezCounters();
            counters.incrAllCounters((AbstractCounters)this.dagCounters);
            TezCounters tezCounters = DAGImpl.incrTaskCounters(counters, this.vertices.values());
            return tezCounters;
        }
        finally {
            this.readLock.unlock();
        }
    }

    public static TezCounters incrTaskCounters(TezCounters counters, Collection<Vertex> vertices) {
        for (Vertex vertex : vertices) {
            counters.incrAllCounters((AbstractCounters)vertex.getAllCounters());
        }
        return counters;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<String> getDiagnostics() {
        this.readLock.lock();
        try {
            List<String> list = this.diagnostics;
            return list;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DAGReport getReport() {
        this.readLock.lock();
        try {
            DAGReport dAGReport;
            StringBuilder diagsb = new StringBuilder();
            for (String s : this.getDiagnostics()) {
                diagsb.append(s).append("\n");
            }
            if (this.getInternalState() == DAGState.NEW) {
                dAGReport = TezBuilderUtils.newDAGReport();
                return dAGReport;
            }
            dAGReport = TezBuilderUtils.newDAGReport();
            return dAGReport;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public float getProgress() {
        this.readLock.lock();
        try {
            float progress = 0.0f;
            for (Vertex v : this.getVertices().values()) {
                progress += v.getProgress();
            }
            float f = progress / (float)this.getTotalVertices();
            return f;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TezVertexID, Vertex> getVertices() {
        Object object = this.tasksSyncHandle;
        synchronized (object) {
            return Collections.unmodifiableMap(this.vertices);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DAGState getState() {
        this.readLock.lock();
        try {
            DAGState dAGState = (DAGState)this.getStateMachine().getCurrentState();
            return dAGState;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions) {
        DAGStatusBuilder status = new DAGStatusBuilder();
        int totalTaskCount = 0;
        int totalSucceededTaskCount = 0;
        int totalRunningTaskCount = 0;
        int totalFailedTaskCount = 0;
        int totalKilledTaskCount = 0;
        this.readLock.lock();
        try {
            for (Map.Entry<String, Vertex> entry : this.vertexMap.entrySet()) {
                ProgressBuilder progress = entry.getValue().getVertexProgress();
                status.addVertexProgress(entry.getKey(), progress);
                totalTaskCount += progress.getTotalTaskCount();
                totalSucceededTaskCount += progress.getSucceededTaskCount();
                totalRunningTaskCount += progress.getRunningTaskCount();
                totalFailedTaskCount += progress.getFailedTaskCount();
                totalKilledTaskCount += progress.getKilledTaskCount();
            }
            ProgressBuilder dagProgress = new ProgressBuilder();
            dagProgress.setTotalTaskCount(totalTaskCount);
            dagProgress.setSucceededTaskCount(totalSucceededTaskCount);
            dagProgress.setRunningTaskCount(totalRunningTaskCount);
            dagProgress.setFailedTaskCount(totalFailedTaskCount);
            dagProgress.setKilledTaskCount(totalKilledTaskCount);
            status.setState(this.getState());
            status.setDiagnostics(this.diagnostics);
            status.setDAGProgress(dagProgress);
            if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
                status.setDAGCounters(this.getAllCounters());
            }
            DAGStatusBuilder dAGStatusBuilder = status;
            return dAGStatusBuilder;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public VertexStatusBuilder getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) {
        Vertex vertex = this.vertexMap.get(vertexName);
        if (vertex == null) {
            return null;
        }
        return vertex.getVertexStatus(statusOptions);
    }

    protected void initializeVerticesAndStart() {
        for (Vertex v : this.vertices.values()) {
            if (v.getInputVerticesCount() != 0) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Initing root vertex " + v.getName()));
            }
            this.eventHandler.handle((Event)new VertexEvent(v.getVertexId(), VertexEventType.V_INIT));
        }
        for (Vertex v : this.vertices.values()) {
            if (v.getInputVerticesCount() != 0) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Starting root vertex " + v.getName()));
            }
            this.eventHandler.handle((Event)new VertexEvent(v.getVertexId(), VertexEventType.V_START));
        }
    }

    private boolean commitOutput(String outputName, OutputCommitter outputCommitter) {
        final OutputCommitter committer = outputCommitter;
        try {
            this.getDagUGI().doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                @Override
                public Void run() throws Exception {
                    committer.commitOutput();
                    return null;
                }
            });
            return true;
        }
        catch (Exception e) {
            LOG.info((Object)("Exception in committing output: " + outputName), (Throwable)e);
            return false;
        }
    }

    private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) {
        Map<String, OutputCommitter> outputCommitters;
        OutputCommitter committer;
        if (this.committedOrAborted) {
            LOG.info((Object)"Ignoring multiple output commit/abort");
            return this.allOutputsCommitted;
        }
        LOG.info((Object)("Calling DAG commit/abort for dag: " + this.getID()));
        this.committedOrAborted = true;
        boolean successfulOutputsAlreadyCommitted = !this.commitAllOutputsOnSuccess;
        boolean failedWhileCommitting = false;
        if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
            try {
                this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.getID(), new DAGCommitStartedEvent(this.getID(), this.clock.getTime())));
            }
            catch (IOException e) {
                LOG.error((Object)"Failed to send commit event to history/recovery handler", (Throwable)e);
                this.trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE);
                return false;
            }
            block4: for (VertexGroupInfo groupInfo : this.vertexGroups.values()) {
                if (failedWhileCommitting) break;
                if (groupInfo.outputs.isEmpty()) continue;
                groupInfo.committed = true;
                Vertex v = this.getVertex(groupInfo.groupMembers.iterator().next());
                for (String outputName : groupInfo.outputs) {
                    committer = v.getOutputCommitters().get(outputName);
                    LOG.info((Object)("Committing output: " + outputName + " for group: " + groupInfo.groupName));
                    if (this.commitOutput(outputName, committer)) continue;
                    failedWhileCommitting = true;
                    continue block4;
                }
            }
            block6: for (Vertex vertex : this.vertices.values()) {
                if (failedWhileCommitting) break;
                if (vertex.getOutputCommitters() == null) {
                    LOG.info((Object)("No output committers for vertex: " + vertex.getName()));
                    continue;
                }
                outputCommitters = new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
                Set<String> sharedOutputs = vertex.getSharedOutputs();
                if (sharedOutputs != null) {
                    Iterator<Map.Entry<String, OutputCommitter>> iter = outputCommitters.entrySet().iterator();
                    while (iter.hasNext()) {
                        if (!sharedOutputs.contains(iter.next().getKey())) continue;
                        iter.remove();
                    }
                }
                if (outputCommitters.isEmpty()) {
                    LOG.info((Object)("No exclusive output committers for vertex: " + vertex.getName()));
                    continue;
                }
                for (Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) {
                    LOG.info((Object)("Committing output: " + entry.getKey() + " for vertex: " + vertex.getVertexId()));
                    if (vertex.getState() != VertexState.SUCCEEDED) {
                        throw new TezUncheckedException("Vertex: " + vertex.getName() + " not in SUCCEEDED state. State= " + (Object)((Object)vertex.getState()));
                    }
                    if (this.commitOutput(entry.getKey(), entry.getValue())) continue;
                    failedWhileCommitting = true;
                    continue block6;
                }
            }
        }
        if (failedWhileCommitting) {
            LOG.info((Object)("DAG: " + this.getID() + " failed while committing"));
        }
        if (!dagSucceeded || failedWhileCommitting) {
            for (Vertex vertex : this.vertices.values()) {
                outputCommitters = vertex.getOutputCommitters();
                if (outputCommitters == null || outputCommitters.isEmpty()) {
                    LOG.info((Object)("No output committers for vertex: " + vertex.getName()));
                    continue;
                }
                for (Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) {
                    committer = entry.getValue();
                    if (!this.commitAllOutputsOnSuccess && vertex.getState() == VertexState.SUCCEEDED) continue;
                    LOG.info((Object)("Aborting output: " + entry.getKey() + " for vertex: " + vertex.getVertexId()));
                    try {
                        this.getDagUGI().doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                            @Override
                            public Void run() throws Exception {
                                committer.abortOutput(VertexStatus.State.FAILED);
                                return null;
                            }
                        });
                    }
                    catch (Exception e) {
                        LOG.info((Object)("Exception in aborting output: " + entry.getKey() + " for vertex: " + vertex.getVertexId()), (Throwable)e);
                    }
                }
            }
        }
        this.allOutputsCommitted = !failedWhileCommitting;
        return this.allOutputsCommitted;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handle(DAGEvent event) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Processing DAGEvent " + event.getDAGId() + " of type " + event.getType() + " while in state " + (Object)((Object)this.getInternalState()) + ". Event: " + (Object)((Object)event)));
        }
        try {
            this.writeLock.lock();
            DAGState oldState = this.getInternalState();
            try {
                this.getStateMachine().doTransition(event.getType(), (Object)event);
            }
            catch (InvalidStateTransitonException e) {
                LOG.error((Object)"Can't handle this event at current state", (Throwable)e);
                this.addDiagnostic("Invalid event " + event.getType() + " on Job " + this.dagId);
                this.eventHandler.handle((Event)new DAGEvent(this.dagId, DAGEventType.INTERNAL_ERROR));
            }
            if (oldState != this.getInternalState()) {
                LOG.info((Object)(this.dagId + " transitioned from " + (Object)((Object)oldState) + " to " + (Object)((Object)this.getInternalState())));
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @InterfaceAudience.Private
    public DAGState getInternalState() {
        this.readLock.lock();
        try {
            DAGState dAGState = (DAGState)this.getStateMachine().getCurrentState();
            return dAGState;
        }
        finally {
            this.readLock.unlock();
        }
    }

    void setFinishTime() {
        this.finishTime = this.clock.getTime();
    }

    void logJobHistoryFinishedEvent() throws IOException {
        this.setFinishTime();
        DAGFinishedEvent finishEvt = new DAGFinishedEvent(this.dagId, this.startTime, this.finishTime, DAGState.SUCCEEDED, "", this.getAllCounters(), this.userName, this.dagName);
        this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.dagId, finishEvt));
    }

    void logJobHistoryInitedEvent() {
        DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId, this.initTime, this.userName, this.dagName);
        this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.dagId, initEvt));
    }

    void logJobHistoryStartedEvent() {
        DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId, this.startTime, this.userName, this.dagName);
        this.appContext.getHistoryHandler().handle(new DAGHistoryEvent(this.dagId, startEvt));
    }

    void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException {
        DAGFinishedEvent finishEvt = new DAGFinishedEvent(this.dagId, this.startTime, this.clock.getTime(), state, StringUtils.join(this.getDiagnostics(), (String)LINE_SEPARATOR), this.getAllCounters(), this.userName, this.dagName);
        this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.dagId, finishEvt));
    }

    static DAGState checkDAGForCompletion(DAGImpl dag) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Checking dag completion, numCompletedVertices=" + dag.numCompletedVertices + ", numSuccessfulVertices=" + dag.numSuccessfulVertices + ", numFailedVertices=" + dag.numFailedVertices + ", numKilledVertices=" + dag.numKilledVertices + ", numVertices=" + dag.numVertices + ", terminationCause=" + (Object)((Object)dag.terminationCause)));
        }
        if (dag.numCompletedVertices > dag.numVertices) {
            LOG.error((Object)("vertex completion accounting issue: numCompletedVertices > numVertices, numCompletedVertices=" + dag.numCompletedVertices + ", numVertices=" + dag.numVertices));
        }
        if (dag.numCompletedVertices == dag.numVertices) {
            dag.setFinishTime();
            if (dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) {
                return dag.finished(DAGState.SUCCEEDED);
            }
            if (dag.terminationCause == DAGTerminationCause.DAG_KILL) {
                String diagnosticMsg = "DAG killed due to user-initiated kill. failedVertices:" + dag.numFailedVertices + " killedVertices:" + dag.numKilledVertices;
                LOG.info((Object)diagnosticMsg);
                dag.addDiagnostic(diagnosticMsg);
                return dag.finished(DAGState.KILLED);
            }
            if (dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE) {
                String diagnosticMsg = "DAG failed due to vertex failure. failedVertices:" + dag.numFailedVertices + " killedVertices:" + dag.numKilledVertices;
                LOG.info((Object)diagnosticMsg);
                dag.addDiagnostic(diagnosticMsg);
                return dag.finished(DAGState.FAILED);
            }
            if (dag.terminationCause == DAGTerminationCause.COMMIT_FAILURE) {
                String diagnosticMsg = "DAG failed due to commit failure. failedVertices:" + dag.numFailedVertices + " killedVertices:" + dag.numKilledVertices;
                LOG.info((Object)diagnosticMsg);
                dag.addDiagnostic(diagnosticMsg);
                return dag.finished(DAGState.FAILED);
            }
            if (dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE) {
                String diagnosticMsg = "DAG failed due to failure in recovery handling. failedVertices:" + dag.numFailedVertices + " killedVertices:" + dag.numKilledVertices;
                LOG.info((Object)diagnosticMsg);
                dag.addDiagnostic(diagnosticMsg);
                return dag.finished(DAGState.FAILED);
            }
            String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG, numCompletedVertices=" + dag.numCompletedVertices + ", numSuccessfulVertices=" + dag.numSuccessfulVertices + ", numFailedVertices=" + dag.numFailedVertices + ", numKilledVertices=" + dag.numKilledVertices + ", numVertices=" + dag.numVertices + ", terminationCause=" + (Object)((Object)dag.terminationCause);
            LOG.error((Object)diagnosticMsg);
            dag.addDiagnostic(diagnosticMsg);
            return dag.finished(DAGState.ERROR);
        }
        return dag.getInternalState();
    }

    private synchronized DAGState finished(DAGState finalState) {
        if (this.finishTime == 0L) {
            this.setFinishTime();
        }
        boolean allOutputsCommitted = this.commitOrAbortOutputs(finalState == DAGState.SUCCEEDED);
        if (finalState == DAGState.SUCCEEDED && !allOutputsCommitted) {
            finalState = DAGState.FAILED;
            this.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
        }
        boolean recoveryError = false;
        try {
            if (finalState == DAGState.SUCCEEDED) {
                this.logJobHistoryFinishedEvent();
            } else {
                this.logJobHistoryUnsuccesfulEvent(finalState);
            }
        }
        catch (IOException e) {
            LOG.warn((Object)("Failed to persist recovery event for DAG completion, dagId=" + this.dagId + ", finalState=" + (Object)((Object)finalState)));
            recoveryError = true;
        }
        if (recoveryError) {
            this.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(this.getID(), DAGState.ERROR));
        } else {
            this.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(this.getID(), finalState));
        }
        LOG.info((Object)("DAG: " + this.getID() + " finished with state: " + (Object)((Object)finalState)));
        return finalState;
    }

    private DAGStatus.State getDAGStatusFromState(DAGState finalState) {
        switch (finalState) {
            case NEW: {
                return DAGStatus.State.INITING;
            }
            case INITED: {
                return DAGStatus.State.INITING;
            }
            case RUNNING: {
                return DAGStatus.State.RUNNING;
            }
            case SUCCEEDED: {
                return DAGStatus.State.SUCCEEDED;
            }
            case FAILED: {
                return DAGStatus.State.FAILED;
            }
            case KILLED: {
                return DAGStatus.State.KILLED;
            }
            case ERROR: {
                return DAGStatus.State.ERROR;
            }
            case TERMINATING: {
                return DAGStatus.State.KILLED;
            }
        }
        throw new TezUncheckedException("Unknown DAGState: " + (Object)((Object)finalState));
    }

    @Override
    public String getUserName() {
        return this.userName;
    }

    @Override
    public String getName() {
        return this.dagName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getTotalVertices() {
        this.readLock.lock();
        try {
            int n = this.numVertices;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getSuccessfulVertices() {
        this.readLock.lock();
        try {
            int n = this.numSuccessfulVertices;
            return n;
        }
        finally {
            this.readLock.unlock();
        }
    }

    boolean trySetTerminationCause(DAGTerminationCause trigger) {
        if (this.terminationCause == null) {
            this.terminationCause = trigger;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    DAGTerminationCause getTerminationCause() {
        this.readLock.lock();
        try {
            DAGTerminationCause dAGTerminationCause = this.terminationCause;
            return dAGTerminationCause;
        }
        finally {
            this.readLock.unlock();
        }
    }

    public DAGState initializeDAG() {
        return this.initializeDAG(null);
    }

    DAGState initializeDAG(DAGInitializedEvent event) {
        this.initTime = event != null ? event.getInitTime() : this.clock.getTime();
        this.commitAllOutputsOnSuccess = this.conf.getBoolean("tez.am.commit-all-outputs-on-dag-success", true);
        this.numVertices = this.getJobPlan().getVertexCount();
        if (this.numVertices == 0) {
            this.addDiagnostic("No vertices for dag");
            this.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
            if (event != null) {
                return DAGState.FAILED;
            }
            return this.finished(DAGState.FAILED);
        }
        if (this.jobPlan.getVertexGroupsCount() > 0) {
            for (DAGProtos.PlanVertexGroupInfo planVertexGroupInfo : this.jobPlan.getVertexGroupsList()) {
                this.vertexGroups.put(planVertexGroupInfo.getGroupName(), new VertexGroupInfo(planVertexGroupInfo));
            }
            for (VertexGroupInfo vertexGroupInfo : this.vertexGroups.values()) {
                for (String vertexName : vertexGroupInfo.groupMembers) {
                    LinkedList groupList = this.vertexGroupInfo.get(vertexName);
                    if (groupList == null) {
                        groupList = Lists.newLinkedList();
                        this.vertexGroupInfo.put(vertexName, groupList);
                    }
                    groupList.add(vertexGroupInfo);
                }
            }
        }
        for (int i = 0; i < this.numVertices; ++i) {
            String string = this.getJobPlan().getVertex(i).getName();
            VertexImpl vertexImpl = DAGImpl.createVertex(this, string, i);
            this.addVertex(vertexImpl);
        }
        this.createDAGEdges(this);
        Map edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan((List)this.getJobPlan().getEdgeList());
        for (Vertex vertex : this.vertices.values()) {
            DAGImpl.parseVertexEdges(this, edgePlans, vertex);
        }
        for (Edge edge : this.edges.values()) {
            edge.initialize();
        }
        DAGImpl.assignDAGScheduler(this);
        for (Map.Entry<String, VertexGroupInfo> entry : this.vertexGroups.entrySet()) {
            String groupName = entry.getKey();
            VertexGroupInfo groupInfo = entry.getValue();
            if (groupInfo.outputs.isEmpty()) continue;
            for (String vertexName : groupInfo.groupMembers) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Setting shared outputs for group: " + groupName + " on vertex: " + vertexName));
                }
                Vertex v = this.getVertex(vertexName);
                v.addSharedOutputs(groupInfo.outputs);
            }
        }
        return DAGState.INITED;
    }

    private void createDAGEdges(DAGImpl dag) {
        for (DAGProtos.EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
            EdgeProperty edgeProperty = DagTypeConverters.createEdgePropertyMapFromDAGPlan((DAGProtos.EdgePlan)edgePlan);
            dag.edges.put(edgePlan.getId(), new Edge(edgeProperty, dag.getEventHandler()));
        }
    }

    private static void assignDAGScheduler(DAGImpl dag) {
        LOG.info((Object)"Using Natural order dag scheduler");
        dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
    }

    private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
        TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
        DAGProtos.VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
        VertexLocationHint vertexLocationHint = DagTypeConverters.convertFromDAGPlan((List)vertexPlan.getTaskLocationHintList());
        VertexImpl v = new VertexImpl(vertexId, vertexPlan, vertexName, dag.conf, dag.eventHandler, dag.taskAttemptListener, dag.clock, dag.taskHeartbeatHandler, !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint, dag.vertexGroups, dag.taskSpecificLaunchCmdOption);
        return v;
    }

    private static void parseVertexEdges(DAGImpl dag, Map<String, DAGProtos.EdgePlan> edgePlans, Vertex vertex) {
        Edge edge;
        DAGProtos.EdgePlan edgePlan;
        DAGProtos.VertexPlan vertexPlan = vertex.getVertexPlan();
        HashMap<Vertex, Edge> inVertices = new HashMap<Vertex, Edge>();
        HashMap<Vertex, Edge> outVertices = new HashMap<Vertex, Edge>();
        for (String inEdgeId : vertexPlan.getInEdgeIdList()) {
            edgePlan = edgePlans.get(inEdgeId);
            Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
            edge = dag.edges.get(inEdgeId);
            edge.setSourceVertex(inVertex);
            edge.setDestinationVertex(vertex);
            inVertices.put(inVertex, edge);
        }
        for (String outEdgeId : vertexPlan.getOutEdgeIdList()) {
            edgePlan = edgePlans.get(outEdgeId);
            Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
            edge = dag.edges.get(outEdgeId);
            edge.setSourceVertex(vertex);
            edge.setDestinationVertex(outVertex);
            outVertices.put(outVertex, edge);
        }
        vertex.setInputVertices(inVertices);
        vertex.setOutputVertices(outVertices);
    }

    void addVertex(Vertex v) {
        this.vertices.put(v.getVertexId(), v);
        this.vertexMap.put(v.getName(), v);
    }

    @Override
    public Vertex getVertex(String vertexName) {
        return this.vertexMap.get(vertexName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mayBeConstructFinalFullCounters() {
        Object object = this.fullCountersLock;
        synchronized (object) {
            if (this.fullCounters != null) {
                return;
            }
            this.constructFinalFullcounters();
        }
    }

    @InterfaceAudience.Private
    public void constructFinalFullcounters() {
        this.fullCounters = new TezCounters();
        this.fullCounters.incrAllCounters((AbstractCounters)this.dagCounters);
        for (Vertex v : this.vertices.values()) {
            this.fullCounters.incrAllCounters((AbstractCounters)v.getAllCounters());
        }
    }

    void enactKill(DAGTerminationCause dagTerminationCause, VertexTerminationCause vertexTerminationCause) {
        if (this.trySetTerminationCause(dagTerminationCause)) {
            for (Vertex v : this.vertices.values()) {
                this.eventHandler.handle((Event)new VertexEventTermination(v.getVertexId(), vertexTerminationCause));
            }
        }
    }

    private boolean vertexSucceeded(Vertex vertex) {
        List<VertexGroupInfo> groupsList;
        ++this.numSuccessfulVertices;
        boolean failedCommit = false;
        boolean recoveryFailed = false;
        if (!this.commitAllOutputsOnSuccess && (groupsList = this.vertexGroupInfo.get(vertex.getName())) != null) {
            ArrayList commitList = Lists.newArrayListWithCapacity((int)groupsList.size());
            for (VertexGroupInfo groupInfo : groupsList) {
                ++groupInfo.successfulMembers;
                if (groupInfo.groupMembers.size() != groupInfo.successfulMembers || groupInfo.outputs.isEmpty()) continue;
                LOG.info((Object)("All members of group: " + groupInfo.groupName + " are succeeded. Commiting outputs"));
                commitList.add(groupInfo);
            }
            for (VertexGroupInfo groupInfo : commitList) {
                if (this.recoveredGroupCommits.containsKey(groupInfo.groupName)) {
                    LOG.info((Object)("VertexGroup was already committed as per recovery data, groupName=" + groupInfo.groupName));
                    continue;
                }
                groupInfo.committed = true;
                Vertex v = this.getVertex(groupInfo.groupMembers.iterator().next());
                try {
                    this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.getID(), new VertexGroupCommitStartedEvent(this.dagId, groupInfo.groupName, this.clock.getTime())));
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed to send commit recovery event to handler", (Throwable)e);
                    recoveryFailed = true;
                    failedCommit = true;
                }
                if (!failedCommit) {
                    for (String outputName : groupInfo.outputs) {
                        OutputCommitter committer = v.getOutputCommitters().get(outputName);
                        LOG.info((Object)("Committing output: " + outputName));
                        if (this.commitOutput(outputName, committer)) continue;
                        failedCommit = true;
                        break;
                    }
                }
                if (failedCommit) break;
                try {
                    this.appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(this.getID(), new VertexGroupCommitFinishedEvent(this.dagId, groupInfo.groupName, this.clock.getTime())));
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed to send commit recovery event to handler", (Throwable)e);
                    recoveryFailed = true;
                    failedCommit = true;
                }
            }
        }
        if (failedCommit) {
            LOG.info((Object)"Aborting job due to failure in commit.");
            if (!recoveryFailed) {
                this.enactKill(DAGTerminationCause.COMMIT_FAILURE, VertexTerminationCause.COMMIT_FAILURE);
            } else {
                LOG.info((Object)"Recovery failure occurred during commit");
                this.enactKill(DAGTerminationCause.RECOVERY_FAILURE, VertexTerminationCause.COMMIT_FAILURE);
            }
        }
        return !failedCommit;
    }

    private boolean vertexReRunning(Vertex vertex) {
        List<VertexGroupInfo> groupList;
        this.reRunningVertices.add(vertex.getVertexId());
        --this.numSuccessfulVertices;
        this.addDiagnostic("Vertex re-running, vertexName=" + vertex.getName() + ", vertexId=" + vertex.getVertexId());
        if (!this.commitAllOutputsOnSuccess && (groupList = this.vertexGroupInfo.get(vertex.getName())) != null) {
            for (VertexGroupInfo groupInfo : groupList) {
                if (!groupInfo.committed) continue;
                LOG.info((Object)("Aborting job as committed vertex: " + vertex.getVertexId() + " is re-running"));
                this.enactKill(DAGTerminationCause.COMMIT_FAILURE, VertexTerminationCause.COMMIT_FAILURE);
                return true;
            }
        }
        return false;
    }

    private void vertexFailed(Vertex vertex) {
        ++this.numFailedVertices;
        this.addDiagnostic("Vertex failed, vertexName=" + vertex.getName() + ", vertexId=" + vertex.getVertexId() + ", diagnostics=" + vertex.getDiagnostics());
    }

    private void vertexKilled(Vertex vertex) {
        ++this.numKilledVertices;
        this.addDiagnostic("Vertex killed, vertexName=" + vertex.getName() + ", vertexId=" + vertex.getVertexId() + ", diagnostics=" + vertex.getDiagnostics());
    }

    private void addDiagnostic(String diag) {
        this.diagnostics.add(diag);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isComplete() {
        this.readLock.lock();
        try {
            DAGState state = this.getState();
            if (state.equals((Object)DAGState.SUCCEEDED) || state.equals((Object)DAGState.FAILED) || state.equals((Object)DAGState.KILLED) || state.equals((Object)DAGState.ERROR)) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    private static class InternalErrorTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private InternalErrorTransition() {
        }

        public void transition(DAGImpl job, DAGEvent event) {
            LOG.info((Object)(job.getID() + " terminating due to internal error"));
            job.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR);
            job.setFinishTime();
            job.finished(DAGState.ERROR);
        }
    }

    private static class DAGSchedulerUpdateTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private DAGSchedulerUpdateTransition() {
        }

        public void transition(DAGImpl dag, DAGEvent event) {
            DAGEventSchedulerUpdate sEvent = (DAGEventSchedulerUpdate)event;
            switch (sEvent.getUpdateType()) {
                case TA_SCHEDULE: {
                    dag.dagScheduler.scheduleTask(sEvent);
                    break;
                }
                case TA_SCHEDULED: {
                    DAGEventSchedulerUpdateTAAssigned taEvent = (DAGEventSchedulerUpdateTAAssigned)sEvent;
                    dag.dagScheduler.taskScheduled(taEvent);
                    break;
                }
                case TA_SUCCEEDED: {
                    dag.dagScheduler.taskSucceeded(sEvent);
                    break;
                }
                default: {
                    throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:" + (Object)((Object)sEvent.getUpdateType()));
                }
            }
        }
    }

    private static class CounterUpdateTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private CounterUpdateTransition() {
        }

        public void transition(DAGImpl job, DAGEvent event) {
            DAGEventCounterUpdate jce = (DAGEventCounterUpdate)event;
            for (DAGEventCounterUpdate.CounterIncrementalUpdate ci : jce.getCounterUpdates()) {
                job.dagCounters.findCounter(ci.getCounterKey()).increment(ci.getIncrementValue());
            }
        }
    }

    private static class DiagnosticsUpdateTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private DiagnosticsUpdateTransition() {
        }

        public void transition(DAGImpl job, DAGEvent event) {
            job.addDiagnostic(((DAGEventDiagnosticsUpdate)event).getDiagnosticUpdate());
        }
    }

    private static class VertexReRunningTransition
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private VertexReRunningTransition() {
        }

        public DAGState transition(DAGImpl job, DAGEvent event) {
            DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning)event;
            Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
            --job.numCompletedVertices;
            boolean failed = job.vertexReRunning(vertex);
            LOG.info((Object)("Vertex " + vertex.getVertexId() + " re-running." + ", numCompletedVertices=" + job.numCompletedVertices + ", numSuccessfulVertices=" + job.numSuccessfulVertices + ", numFailedVertices=" + job.numFailedVertices + ", numKilledVertices=" + job.numKilledVertices + ", numVertices=" + job.numVertices));
            if (failed) {
                return DAGState.TERMINATING;
            }
            return DAGState.RUNNING;
        }
    }

    private static class VertexCompletedTransition
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private VertexCompletedTransition() {
        }

        public DAGState transition(DAGImpl job, DAGEvent event) {
            boolean forceTransitionToKillWait = false;
            DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted)event;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Received a vertex completion event, vertexId=" + vertexEvent.getVertexId() + ", vertexState=" + (Object)((Object)vertexEvent.getVertexState())));
            }
            Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
            ++job.numCompletedVertices;
            if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
                if (!job.reRunningVertices.contains(vertex.getVertexId())) {
                    job.dagScheduler.vertexCompleted(vertex);
                }
                forceTransitionToKillWait = !job.vertexSucceeded(vertex);
            } else if (vertexEvent.getVertexState() == VertexState.FAILED) {
                job.enactKill(DAGTerminationCause.VERTEX_FAILURE, vertexEvent.getVertexTerminationCause() == null ? VertexTerminationCause.OTHER_VERTEX_FAILURE : vertexEvent.getVertexTerminationCause());
                job.vertexFailed(vertex);
                forceTransitionToKillWait = true;
            } else if (vertexEvent.getVertexState() == VertexState.KILLED) {
                job.vertexKilled(vertex);
                forceTransitionToKillWait = true;
            }
            job.reRunningVertices.remove(vertex.getVertexId());
            LOG.info((Object)("Vertex " + vertex.getVertexId() + " completed." + ", numCompletedVertices=" + job.numCompletedVertices + ", numSuccessfulVertices=" + job.numSuccessfulVertices + ", numFailedVertices=" + job.numFailedVertices + ", numKilledVertices=" + job.numKilledVertices + ", numVertices=" + job.numVertices));
            DAGState state = DAGImpl.checkDAGForCompletion(job);
            if (state == DAGState.RUNNING && forceTransitionToKillWait) {
                return DAGState.TERMINATING;
            }
            return state;
        }
    }

    private static class DAGKilledTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private DAGKilledTransition() {
        }

        public void transition(DAGImpl job, DAGEvent event) {
            job.addDiagnostic("Job received Kill while in RUNNING state.");
            job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL);
        }
    }

    private static class KillInitedJobTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private KillInitedJobTransition() {
        }

        public void transition(DAGImpl dag, DAGEvent dagEvent) {
            dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
            dag.addDiagnostic("Job received Kill in INITED state.");
            dag.finished(DAGState.KILLED);
        }
    }

    private static class KillNewJobTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        private KillNewJobTransition() {
        }

        public void transition(DAGImpl dag, DAGEvent dagEvent) {
            dag.setFinishTime();
            dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
            dag.finished(DAGState.KILLED);
        }
    }

    public static class StartTransition
    implements SingleArcTransition<DAGImpl, DAGEvent> {
        public void transition(DAGImpl dag, DAGEvent event) {
            DAGEventStartDag startEvent = (DAGEventStartDag)event;
            dag.startTime = dag.clock.getTime();
            dag.logJobHistoryStartedEvent();
            List<URL> additionalUrlsForClasspath = startEvent.getAdditionalUrlsForClasspath();
            if (additionalUrlsForClasspath != null) {
                LOG.info((Object)("Added additional resources : [" + additionalUrlsForClasspath + "] to classpath"));
                RelocalizationUtils.addUrlsToClassPath(additionalUrlsForClasspath);
            }
            dag.initializeVerticesAndStart();
        }
    }

    private static class InitTransition
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private InitTransition() {
        }

        public DAGState transition(DAGImpl dag, DAGEvent event) {
            DAGState state = dag.initializeDAG();
            if (state != DAGState.INITED) {
                return state;
            }
            dag.logJobHistoryInitedEvent();
            return DAGState.INITED;
        }
    }

    private static class RecoverTransition
    implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
        private RecoverTransition() {
        }

        public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
            DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent)dagEvent;
            if (recoverEvent.hasDesiredState()) {
                dag.recoveredState = recoverEvent.getDesiredState();
            }
            if (recoverEvent.getAdditionalUrlsForClasspath() != null) {
                LOG.info((Object)("Added additional resources : [" + recoverEvent.getAdditionalUrlsForClasspath() + "] to classpath"));
                RelocalizationUtils.addUrlsToClassPath(recoverEvent.getAdditionalUrlsForClasspath());
            }
            switch (dag.recoveredState) {
                case NEW: {
                    dag.eventHandler.handle((Event)new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
                    dag.eventHandler.handle((Event)new DAGEventStartDag(dag.getID(), null));
                    return DAGState.NEW;
                }
                case INITED: {
                    for (Vertex v : dag.vertices.values()) {
                        if (v.getInputVerticesCount() != 0) continue;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Sending Running Recovery event to root vertex " + v.getName()));
                        }
                        dag.eventHandler.handle((Event)new VertexEventRecoverVertex(v.getVertexId(), VertexState.RUNNING));
                    }
                    return DAGState.RUNNING;
                }
                case RUNNING: {
                    boolean groupCommitInProgress = false;
                    if (!dag.recoveredGroupCommits.isEmpty()) {
                        for (Map.Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) {
                            if (entry.getValue().booleanValue()) continue;
                            LOG.info((Object)("Found a pending Vertex Group commit, vertexGroup=" + entry.getKey()));
                            groupCommitInProgress = true;
                            break;
                        }
                    }
                    if (groupCommitInProgress || dag.recoveryCommitInProgress) {
                        dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
                        dag.setFinishTime();
                        for (Vertex v : dag.vertices.values()) {
                            VertexState desiredState = VertexState.SUCCEEDED;
                            if (dag.recoveredState.equals((Object)DAGState.KILLED)) {
                                desiredState = VertexState.KILLED;
                            } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains((Object)dag.recoveredState)) {
                                desiredState = VertexState.FAILED;
                            }
                            dag.eventHandler.handle((Event)new VertexEventRecoverVertex(v.getVertexId(), desiredState));
                        }
                        DAGState endState = DAGState.FAILED;
                        try {
                            dag.logJobHistoryUnsuccesfulEvent(endState);
                        }
                        catch (IOException e) {
                            LOG.warn((Object)("Failed to persist recovery event for DAG completion, dagId=" + dag.dagId + ", finalState=" + (Object)((Object)endState)));
                        }
                        dag.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(dag.getID(), endState));
                        return endState;
                    }
                    for (Vertex v : dag.vertices.values()) {
                        if (v.getInputVerticesCount() != 0) continue;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Sending Running Recovery event to root vertex " + v.getName()));
                        }
                        dag.eventHandler.handle((Event)new VertexEventRecoverVertex(v.getVertexId(), VertexState.RUNNING));
                    }
                    return DAGState.RUNNING;
                }
                case SUCCEEDED: 
                case FAILED: 
                case KILLED: 
                case ERROR: {
                    for (Vertex v : dag.vertices.values()) {
                        VertexState desiredState = VertexState.SUCCEEDED;
                        if (dag.recoveredState.equals((Object)DAGState.KILLED)) {
                            desiredState = VertexState.KILLED;
                        } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains((Object)dag.recoveredState)) {
                            desiredState = VertexState.FAILED;
                        }
                        dag.eventHandler.handle((Event)new VertexEventRecoverVertex(v.getVertexId(), desiredState));
                    }
                    dag.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(dag.getID(), dag.recoveredState));
                    LOG.info((Object)("Recovered DAG: " + dag.getID() + " finished with state: " + (Object)((Object)dag.recoveredState)));
                    return dag.recoveredState;
                }
            }
            LOG.warn((Object)("Trying to recover DAG, failed to recover from non-handled state" + (Object)((Object)dag.recoveredState)));
            dag.eventHandler.handle((Event)new DAGAppMasterEventDAGFinished(dag.getID(), DAGState.ERROR));
            return DAGState.FAILED;
        }
    }

    static class VertexGroupInfo {
        String groupName;
        Set<String> groupMembers;
        Set<String> outputs;
        Map<String, InputDescriptor> edgeMergedInputs;
        int successfulMembers;
        boolean committed;

        VertexGroupInfo(DAGProtos.PlanVertexGroupInfo groupInfo) {
            this.groupName = groupInfo.getGroupName();
            this.groupMembers = Sets.newHashSet((Iterable)groupInfo.getGroupMembersList());
            this.edgeMergedInputs = Maps.newHashMapWithExpectedSize((int)groupInfo.getEdgeMergedInputsCount());
            for (DAGProtos.PlanGroupInputEdgeInfo edgInfo : groupInfo.getEdgeMergedInputsList()) {
                this.edgeMergedInputs.put(edgInfo.getDestVertexName(), DagTypeConverters.convertInputDescriptorFromDAGPlan((DAGProtos.TezEntityDescriptorProto)edgInfo.getMergedInput()));
            }
            this.outputs = Sets.newHashSet((Iterable)groupInfo.getOutputsList());
            this.successfulMembers = 0;
            this.committed = false;
        }
    }
}

