/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.launcher;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.tez.common.EnvironmentUpdateUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.launcher.ContainerLauncher;
import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
import org.apache.tez.dag.app.rm.container.AMContainerEvent;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.runtime.task.TezChild;

public class LocalContainerLauncher
extends AbstractService
implements ContainerLauncher {
    private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class);
    private final AppContext context;
    private final TaskAttemptListener taskAttemptListener;
    private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
    private final String workingDirectory;
    private final ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>> runningContainers = new ConcurrentHashMap();
    private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
    private BlockingQueue<NMCommunicatorEvent> eventQueue = new LinkedBlockingQueue<NMCommunicatorEvent>();
    private Thread eventHandlingThread;
    private ListeningExecutorService taskExecutorService;

    public LocalContainerLauncher(AppContext context, TaskAttemptListener taskAttemptListener, String workingDirectory) throws UnknownHostException {
        super(LocalContainerLauncher.class.getName());
        this.context = context;
        this.taskAttemptListener = taskAttemptListener;
        this.workingDirectory = workingDirectory;
        EnvironmentUpdateUtils.put((String)"NM_AUX_SERVICE_mapreduce_shuffle", (String)Base64.encodeBase64String((byte[])ByteBuffer.allocate(4).putInt(0).array()));
        EnvironmentUpdateUtils.put((String)ApplicationConstants.Environment.NM_HOST.toString(), (String)InetAddress.getLocalHost().getHostName());
        if (Shell.WINDOWS) {
            EnvironmentUpdateUtils.put((String)ApplicationConstants.Environment.USER.name(), (String)System.getenv("USERNAME"));
        }
    }

    public synchronized void serviceInit(Configuration conf) {
        int numExecutors = conf.getInt("tez.am.inline.task.execution.max-tasks", 1);
        Preconditions.checkState((numExecutors >= 1 ? 1 : 0) != 0, (Object)"Must have at least 1 executor");
        ExecutorService rawExecutor = Executors.newFixedThreadPool(numExecutors, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread [%d]").build());
        this.taskExecutorService = MoreExecutors.listeningDecorator((ExecutorService)rawExecutor);
    }

    public void serviceStart() throws Exception {
        this.eventHandlingThread = new Thread((Runnable)new TezSubTaskRunner(), "LocalContainerLauncher-SubTaskRunner");
        this.eventHandlingThread.start();
    }

    public void serviceStop() throws Exception {
        if (!this.serviceStopped.compareAndSet(false, true)) {
            LOG.info((Object)"Service Already stopped. Ignoring additional stop");
        }
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
            this.eventHandlingThread.join(2000L);
        }
        if (this.taskExecutorService != null) {
            this.taskExecutorService.shutdownNow();
        }
        this.callbackExecutor.shutdownNow();
    }

    void sendContainerLaunchFailedMsg(ContainerId containerId, String message) {
        this.context.getEventHandler().handle((Event)new AMContainerEventLaunchFailed(containerId, message));
    }

    private void launch(NMCommunicatorLaunchRequestEvent event) {
        String tokenIdentifier = this.context.getApplicationID().toString();
        String[] localDirs = TezCommonUtils.getTrimmedStrings((String)System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()));
        try {
            ListenableFuture runningTaskFuture = this.taskExecutorService.submit(this.createSubTask(this.context.getAMConf(), event.getContainerId(), tokenIdentifier, this.context.getApplicationAttemptId().getAttemptId(), localDirs, (TezTaskUmbilicalProtocol)this.taskAttemptListener));
            this.runningContainers.put(event.getContainerId(), (ListenableFuture<TezChild.ContainerExecutionResult>)runningTaskFuture);
            Futures.addCallback((ListenableFuture)runningTaskFuture, (FutureCallback)new RunningTaskCallback(this.context, event.getContainerId()), (Executor)this.callbackExecutor);
        }
        catch (RejectedExecutionException e) {
            String message = "Failed to queue container launch for container Id: " + event.getContainerId();
            LOG.error((Object)message, (Throwable)e);
            this.sendContainerLaunchFailedMsg(event.getContainerId(), message);
        }
    }

    private void stop(NMCommunicatorStopRequestEvent event) {
        ListenableFuture<TezChild.ContainerExecutionResult> future = this.runningContainers.get(event.getContainerId());
        if (future == null) {
            LOG.info((Object)("Ignoring stop request for containerId: " + event.getContainerId()));
        } else {
            LOG.info((Object)("Interrupting running/queued container with id: " + event.getContainerId()));
            future.cancel(true);
        }
        this.context.getEventHandler().handle((Event)new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT));
    }

    private synchronized Callable<TezChild.ContainerExecutionResult> createSubTask(final Configuration defaultConf, final ContainerId containerId, final String tokenIdentifier, final int attemptNumber, final String[] localDirs, final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) {
        return new Callable<TezChild.ContainerExecutionResult>(){

            @Override
            public TezChild.ContainerExecutionResult call() throws InterruptedException, TezException, IOException {
                LocalContainerLauncher.this.context.getEventHandler().handle((Event)new AMContainerEventLaunched(containerId));
                ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(containerId, LocalContainerLauncher.this.context.getClock().getTime(), LocalContainerLauncher.this.context.getApplicationAttemptId());
                LocalContainerLauncher.this.context.getHistoryHandler().handle(new DAGHistoryEvent(LocalContainerLauncher.this.context.getCurrentDAGID(), lEvt));
                TezChild tezChild = TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier, attemptNumber, localDirs, LocalContainerLauncher.this.workingDirectory);
                tezChild.setUmbilical(tezTaskUmbilicalProtocol);
                return tezChild.run();
            }
        };
    }

    public void handle(NMCommunicatorEvent event) {
        try {
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new TezUncheckedException((Throwable)e);
        }
    }

    private class RunningTaskCallback
    implements FutureCallback<TezChild.ContainerExecutionResult> {
        private final AppContext appContext;
        private final ContainerId containerId;

        RunningTaskCallback(AppContext appContext, ContainerId containerId) {
            this.appContext = appContext;
            this.containerId = containerId;
        }

        public void onSuccess(TezChild.ContainerExecutionResult result) {
            LocalContainerLauncher.this.runningContainers.remove(this.containerId);
            if (result.getExitStatus() == TezChild.ContainerExecutionResult.ExitStatus.SUCCESS || result.getExitStatus() == TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) {
                LOG.info((Object)("Container: " + this.containerId + " completed successfully"));
                this.appContext.getEventHandler().handle((Event)new AMContainerEventCompleted(this.containerId, result.getExitStatus().getExitCode(), null));
            } else {
                LOG.info((Object)("Container: " + this.containerId + " completed but with errors"));
                this.appContext.getEventHandler().handle((Event)new AMContainerEventCompleted(this.containerId, result.getExitStatus().getExitCode(), (String)(result.getErrorMessage() == null ? (result.getThrowable() == null ? null : result.getThrowable().getMessage()) : result.getErrorMessage())));
            }
        }

        public void onFailure(Throwable t) {
            LocalContainerLauncher.this.runningContainers.remove(this.containerId);
            if (!(t instanceof CancellationException)) {
                LOG.info((Object)("Container: " + this.containerId + ": Execution Failed: "), t);
                this.appContext.getEventHandler().handle((Event)new AMContainerEventCompleted(this.containerId, TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(), t.getMessage()));
            } else {
                LOG.info((Object)"Ignoring CancellationException - triggered by LocalContainerLauncher");
                this.appContext.getEventHandler().handle((Event)new AMContainerEventCompleted(this.containerId, TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(), "CancellationException"));
            }
        }
    }

    private class TezSubTaskRunner
    implements Runnable {
        private TezSubTaskRunner() {
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted() && !LocalContainerLauncher.this.serviceStopped.get()) {
                try {
                    NMCommunicatorEvent event = (NMCommunicatorEvent)((Object)LocalContainerLauncher.this.eventQueue.take());
                    switch ((NMCommunicatorEventType)event.getType()) {
                        case CONTAINER_LAUNCH_REQUEST: {
                            LocalContainerLauncher.this.launch((NMCommunicatorLaunchRequestEvent)event);
                            break;
                        }
                        case CONTAINER_STOP_REQUEST: {
                            LocalContainerLauncher.this.stop((NMCommunicatorStopRequestEvent)event);
                        }
                    }
                }
                catch (InterruptedException e) {
                    if (!LocalContainerLauncher.this.serviceStopped.get()) {
                        LOG.error((Object)"TezSubTaskRunner interrupted ", (Throwable)e);
                    }
                    return;
                }
            }
        }
    }
}

