/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.rm;

import com.google.common.base.Preconditions;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.LocalTaskSchedulerService;
import org.apache.tez.dag.app.rm.TaskSchedulerService;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;

public class TaskSchedulerEventHandler
extends AbstractService
implements TaskSchedulerService.TaskSchedulerAppCallback,
EventHandler<AMSchedulerEvent> {
    static final Log LOG = LogFactory.getLog(TaskSchedulerEventHandler.class);
    protected final AppContext appContext;
    private final EventHandler eventHandler;
    protected TaskSchedulerService taskScheduler;
    private DAGAppMaster dagAppMaster;
    private Map<ApplicationAccessType, String> appAcls = null;
    private Thread eventHandlingThread;
    private volatile boolean stopEventHandling;
    protected volatile boolean isSignalled = false;
    final DAGClientServer clientService;
    private final ContainerSignatureMatcher containerSignatureMatcher;
    private int cachedNodeCount = -1;
    private AtomicBoolean shouldUnregisterFlag = new AtomicBoolean(false);
    BlockingQueue<AMSchedulerEvent> eventQueue = new LinkedBlockingQueue<AMSchedulerEvent>();

    public TaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher) {
        super(TaskSchedulerEventHandler.class.getName());
        this.appContext = appContext;
        this.eventHandler = eventHandler;
        this.clientService = clientService;
        this.containerSignatureMatcher = containerSignatureMatcher;
    }

    public Map<ApplicationAccessType, String> getApplicationAcls() {
        return this.appAcls;
    }

    public void setSignalled(boolean isSignalled) {
        this.isSignalled = isSignalled;
        LOG.info((Object)("TaskScheduler notified that iSignalled was : " + isSignalled));
    }

    public int getNumClusterNodes() {
        return this.cachedNodeCount;
    }

    public Resource getAvailableResources() {
        return this.taskScheduler.getAvailableResources();
    }

    public Resource getTotalResources() {
        return this.taskScheduler.getTotalResources();
    }

    public synchronized void handleEvent(AMSchedulerEvent sEvent) {
        LOG.info((Object)("Processing the event " + sEvent.toString()));
        block0 : switch ((AMSchedulerEventType)sEvent.getType()) {
            case S_TA_LAUNCH_REQUEST: {
                this.handleTaLaunchRequest((AMSchedulerEventTALaunchRequest)sEvent);
                break;
            }
            case S_TA_ENDED: {
                AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
                switch (event.getState()) {
                    case FAILED: 
                    case KILLED: {
                        this.handleTAUnsuccessfulEnd((AMSchedulerEventTAEnded)sEvent);
                        break block0;
                    }
                    case SUCCEEDED: {
                        this.handleTASucceeded(event);
                        break block0;
                    }
                }
                throw new TezUncheckedException("Unexecpted TA_ENDED state: " + (Object)((Object)event.getState()));
            }
            case S_CONTAINER_DEALLOCATE: {
                this.handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
                break;
            }
            case S_NODE_UNBLACKLISTED: 
            case S_NODE_BLACKLISTED: {
                this.handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
                break;
            }
            case S_NODE_UNHEALTHY: {
                break;
            }
            case S_NODE_HEALTHY: {
                break;
            }
        }
    }

    public void handle(AMSchedulerEvent event) {
        int remCapacity;
        int qSize = this.eventQueue.size();
        if (qSize != 0 && qSize % 1000 == 0) {
            LOG.info((Object)("Size of event-queue in RMContainerAllocator is " + qSize));
        }
        if ((remCapacity = this.eventQueue.remainingCapacity()) < 1000) {
            LOG.warn((Object)("Very low remaining capacity in the event-queue of RMContainerAllocator: " + remCapacity));
        }
        try {
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new TezUncheckedException((Throwable)e);
        }
    }

    private void sendEvent(Event<?> event) {
        this.eventHandler.handle(event);
    }

    private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
        if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
            this.taskScheduler.blacklistNode(event.getNodeId());
        } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
            this.taskScheduler.unblacklistNode(event.getNodeId());
        } else {
            throw new TezUncheckedException("Invalid event type: " + event.getType());
        }
    }

    private void handleContainerDeallocate(AMSchedulerEventDeallocateContainer event) {
        ContainerId containerId = event.getContainerId();
        this.taskScheduler.deallocateContainer(containerId);
        this.sendEvent((Event<?>)new AMContainerEventStopRequest(containerId));
    }

    private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
        TaskAttempt attempt = event.getAttempt();
        boolean wasContainerAllocated = this.taskScheduler.deallocateTask(attempt, false);
        ContainerId attemptContainerId = attempt.getAssignedContainerID();
        if (!wasContainerAllocated) {
            LOG.info((Object)("Task: " + attempt.getID() + " has no container assignment in the scheduler"));
            if (attemptContainerId != null) {
                LOG.error((Object)("No container allocated to task: " + attempt.getID() + " according to scheduler. Task reported container id: " + attemptContainerId));
            }
        }
        if (attemptContainerId != null) {
            this.sendEvent((Event<?>)new AMContainerEventStopRequest(attemptContainerId));
            this.sendEvent((Event<?>)new AMNodeEventTaskAttemptEnded(this.appContext.getAllContainers().get(attemptContainerId).getContainer().getNodeId(), attemptContainerId, attempt.getID(), event.getState() == TaskAttemptState.FAILED));
        }
    }

    private void handleTASucceeded(AMSchedulerEventTAEnded event) {
        boolean wasContainerAllocated;
        TaskAttempt attempt = event.getAttempt();
        ContainerId usedContainerId = event.getUsedContainerId();
        if (event.getUsedContainerId() != null) {
            this.sendEvent((Event<?>)new AMContainerEventTASucceeded(usedContainerId, event.getAttemptID()));
            this.sendEvent((Event<?>)new AMNodeEventTaskAttemptSucceeded(this.appContext.getAllContainers().get(usedContainerId).getContainer().getNodeId(), usedContainerId, event.getAttemptID()));
        }
        if (!(wasContainerAllocated = this.taskScheduler.deallocateTask(attempt, true))) {
            LOG.error((Object)("De-allocated successful task: " + attempt.getID() + ", but TaskScheduler reported no container assigned to task"));
        }
    }

    private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
        TaskAttempt taskAttempt = event.getTaskAttempt();
        TaskLocationHint locationHint = event.getLocationHint();
        String[] hosts = null;
        String[] racks = null;
        if (locationHint != null) {
            TaskLocationHint.TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
            if (taskAffinity != null) {
                Vertex vertex = this.appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
                Preconditions.checkNotNull((Object)vertex, (Object)("Invalid vertex in task based affinity " + taskAffinity + " for attempt: " + taskAttempt.getID()));
                int taskIndex = taskAffinity.getTaskIndex();
                Preconditions.checkState((taskIndex >= 0 && taskIndex < vertex.getTotalTasks() ? 1 : 0) != 0, (Object)("Invalid taskIndex in task based affinity " + taskAffinity + " for attempt: " + taskAttempt.getID()));
                TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
                if (affinityAttempt != null) {
                    Preconditions.checkNotNull((Object)affinityAttempt.getAssignedContainerID(), (Object)affinityAttempt.getID());
                    this.taskScheduler.allocateTask(taskAttempt, event.getCapability(), affinityAttempt.getAssignedContainerID(), Priority.newInstance((int)event.getPriority()), event.getContainerContext(), (Object)event);
                    return;
                }
                LOG.info((Object)("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity + " but no locality information exists for it. Ignoring hint."));
            } else {
                hosts = locationHint.getHosts() != null ? locationHint.getHosts().toArray(new String[locationHint.getHosts().size()]) : null;
                racks = locationHint.getRacks() != null ? locationHint.getRacks().toArray(new String[locationHint.getRacks().size()]) : null;
            }
        }
        this.taskScheduler.allocateTask(taskAttempt, event.getCapability(), hosts, racks, Priority.newInstance((int)event.getPriority()), event.getContainerContext(), (Object)event);
    }

    protected TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl, AppContext appContext) {
        boolean isLocal = this.getConfig().getBoolean("tez.local.mode", false);
        if (isLocal) {
            return new LocalTaskSchedulerService(this, this.containerSignatureMatcher, host, port, trackingUrl, appContext);
        }
        return new YarnTaskSchedulerService(this, this.containerSignatureMatcher, host, port, trackingUrl, appContext);
    }

    public synchronized void serviceStart() {
        InetSocketAddress serviceAddr = this.clientService.getBindAddress();
        this.dagAppMaster = this.appContext.getAppMaster();
        this.taskScheduler = this.createTaskScheduler(serviceAddr.getHostName(), serviceAddr.getPort(), "", this.appContext);
        this.taskScheduler.init(this.getConfig());
        this.taskScheduler.start();
        if (this.shouldUnregisterFlag.get()) {
            this.taskScheduler.setShouldUnregister();
        }
        this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!TaskSchedulerEventHandler.this.stopEventHandling && !Thread.currentThread().isInterrupted()) {
                    AMSchedulerEvent event;
                    try {
                        if (TaskSchedulerEventHandler.this.eventQueue.peek() == null) {
                            TaskSchedulerEventHandler.this.notifyForTest();
                        }
                        event = TaskSchedulerEventHandler.this.eventQueue.take();
                    }
                    catch (InterruptedException e) {
                        if (TaskSchedulerEventHandler.this.stopEventHandling) continue;
                        LOG.warn((Object)"Continuing after interrupt : ", (Throwable)e);
                        continue;
                    }
                    try {
                        TaskSchedulerEventHandler.this.handleEvent(event);
                    }
                    catch (Throwable t) {
                        LOG.error((Object)("Error in handling event type " + event.getType() + " to the TaskScheduler"), t);
                        TaskSchedulerEventHandler.this.sendEvent((Event)new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
                        return;
                    }
                    finally {
                        TaskSchedulerEventHandler.this.notifyForTest();
                    }
                }
            }
        };
        this.eventHandlingThread.start();
    }

    protected void notifyForTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void serviceStop() {
        TaskSchedulerEventHandler taskSchedulerEventHandler = this;
        synchronized (taskSchedulerEventHandler) {
            this.stopEventHandling = true;
            if (this.eventHandlingThread != null) {
                this.eventHandlingThread.interrupt();
            }
        }
        if (this.taskScheduler != null) {
            this.taskScheduler.stop();
        }
    }

    @Override
    public synchronized void taskAllocated(Object task, Object appCookie, Container container) {
        ContainerId containerId = container.getId();
        if (this.appContext.getAllContainers().addContainerIfNew(container)) {
            this.appContext.getNodeTracker().nodeSeen(container.getNodeId());
            this.sendEvent((Event<?>)new AMNodeEventContainerAllocated(container.getNodeId(), container.getId()));
        }
        AMSchedulerEventTALaunchRequest event = (AMSchedulerEventTALaunchRequest)((Object)appCookie);
        TaskAttempt taskAttempt = event.getTaskAttempt();
        assert (task.equals(taskAttempt));
        if (this.appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
            this.sendEvent((Event<?>)new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(), event.getContainerContext()));
        }
        this.sendEvent((Event<?>)new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
        this.sendEvent((Event<?>)new AMContainerEventAssignTA(containerId, taskAttempt.getID(), event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event.getContainerContext().getCredentials()));
    }

    @Override
    public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) {
        AMContainer amContainer = this.appContext.getAllContainers().get(containerStatus.getContainerId());
        if (amContainer != null) {
            String message = null;
            int exitStatus = containerStatus.getExitStatus();
            message = exitStatus == -102 ? "Container preempted externally. " : (exitStatus == -101 ? "Container disk failed. " : "Container failed. ");
            if (containerStatus.getDiagnostics() != null) {
                message = message + containerStatus.getDiagnostics();
            }
            this.sendEvent((Event<?>)new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message));
        }
    }

    @Override
    public synchronized void containerBeingReleased(ContainerId containerId) {
        AMContainer amContainer = this.appContext.getAllContainers().get(containerId);
        if (amContainer != null) {
            this.sendEvent((Event<?>)new AMContainerEventStopRequest(containerId));
        }
    }

    @Override
    public synchronized void nodesUpdated(List<NodeReport> updatedNodes) {
        for (NodeReport nr : updatedNodes) {
            this.eventHandler.handle((Event)new AMNodeEventStateChanged(nr));
        }
    }

    @Override
    public synchronized void appShutdownRequested() {
        LOG.info((Object)"App shutdown requested by scheduler");
        this.sendEvent((Event<?>)new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
    }

    @Override
    public synchronized void setApplicationRegistrationData(Resource maxContainerCapability, Map<ApplicationAccessType, String> appAcls, ByteBuffer clientAMSecretKey) {
        this.appContext.getClusterInfo().setMaxContainerCapability(maxContainerCapability);
        this.appAcls = appAcls;
        this.clientService.setClientAMSecretKey(clientAMSecretKey);
    }

    @Override
    public TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus getFinalAppStatus() {
        FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
        StringBuffer sb = new StringBuffer();
        if (this.dagAppMaster == null) {
            finishState = FinalApplicationStatus.UNDEFINED;
            sb.append("App not yet initialized");
        } else {
            DAGAppMasterState appMasterState = this.dagAppMaster.getState();
            finishState = appMasterState == DAGAppMasterState.SUCCEEDED ? FinalApplicationStatus.SUCCEEDED : (appMasterState == DAGAppMasterState.KILLED || appMasterState == DAGAppMasterState.RUNNING && this.isSignalled ? FinalApplicationStatus.KILLED : (appMasterState == DAGAppMasterState.FAILED || appMasterState == DAGAppMasterState.ERROR ? FinalApplicationStatus.FAILED : FinalApplicationStatus.UNDEFINED));
            List<String> diagnostics = this.dagAppMaster.getDiagnostics();
            if (diagnostics != null) {
                for (String s : diagnostics) {
                    sb.append(s).append("\n");
                }
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Setting job diagnostics to " + sb.toString()));
        }
        String historyUrl = "";
        return new TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus(finishState, sb.toString(), historyUrl);
    }

    @Override
    public float getProgress() {
        int nodeCount = this.taskScheduler.getClusterNodeCount();
        if (nodeCount != this.cachedNodeCount) {
            this.cachedNodeCount = nodeCount;
            this.sendEvent((Event<?>)new AMNodeEventNodeCountUpdated(this.cachedNodeCount));
        }
        return this.dagAppMaster.getProgress();
    }

    @Override
    public void onError(Throwable t) {
        LOG.info((Object)"Error reported by scheduler", t);
        this.sendEvent((Event<?>)new DAGAppMasterEventSchedulingServiceError(t));
    }

    public void dagCompleted() {
        this.taskScheduler.resetMatchLocalityForAllHeldContainers();
    }

    @Override
    public void preemptContainer(ContainerId containerId) {
        this.taskScheduler.deallocateContainer(containerId);
        this.sendEvent((Event<?>)new AMContainerEventCompleted(containerId, -102, "Container preempted internally"));
    }

    public void setShouldUnregisterFlag() {
        LOG.info((Object)"TaskScheduler notified that it should unregister from RM");
        this.shouldUnregisterFlag.set(true);
        if (this.taskScheduler != null) {
            this.taskScheduler.setShouldUnregister();
        }
    }

    public boolean hasUnregistered() {
        return this.taskScheduler.hasUnregistered();
    }
}

