/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapTask;
import org.apache.hadoop.mapred.ReduceTask;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

public class LocalContainerLauncher
extends AbstractService
implements ContainerLauncher {
    private static final File curDir = new File(".");
    private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class);
    private FileContext curFC = null;
    private final HashSet<File> localizedFiles;
    private final AppContext context;
    private final TaskUmbilicalProtocol umbilical;
    private ExecutorService taskRunner;
    private Thread eventHandler;
    private BlockingQueue<ContainerLauncherEvent> eventQueue = new LinkedBlockingQueue<ContainerLauncherEvent>();

    public LocalContainerLauncher(AppContext context, TaskUmbilicalProtocol umbilical) {
        super(LocalContainerLauncher.class.getName());
        this.context = context;
        this.umbilical = umbilical;
        try {
            this.curFC = FileContext.getFileContext((URI)curDir.toURI());
        }
        catch (UnsupportedFileSystemException ufse) {
            LOG.error((Object)("Local filesystem " + curDir.toURI().toString() + " is unsupported?? (should never happen)"));
        }
        File[] curLocalFiles = curDir.listFiles();
        this.localizedFiles = new HashSet(curLocalFiles.length);
        for (int j = 0; j < curLocalFiles.length; ++j) {
            this.localizedFiles.add(curLocalFiles[j]);
        }
    }

    public void serviceStart() throws Exception {
        this.taskRunner = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
        this.eventHandler = new Thread((Runnable)new EventHandler(), "uber-EventHandler");
        this.eventHandler.start();
        super.serviceStart();
    }

    public void serviceStop() throws Exception {
        if (this.eventHandler != null) {
            this.eventHandler.interrupt();
        }
        if (this.taskRunner != null) {
            this.taskRunner.shutdownNow();
        }
        super.serviceStop();
    }

    public void handle(ContainerLauncherEvent event) {
        try {
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new YarnRuntimeException((Throwable)e);
        }
    }

    private static class RenamedMapOutputFile
    extends MapOutputFile {
        private Path path;

        public RenamedMapOutputFile(Path path) {
            this.path = path;
        }

        public Path getOutputFile() throws IOException {
            return this.path;
        }

        public Path getOutputFileForWrite(long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Path getOutputFileForWriteInVolume(Path existing) {
            throw new UnsupportedOperationException();
        }

        public Path getOutputIndexFile() throws IOException {
            throw new UnsupportedOperationException();
        }

        public Path getOutputIndexFileForWrite(long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Path getOutputIndexFileForWriteInVolume(Path existing) {
            throw new UnsupportedOperationException();
        }

        public Path getSpillFile(int spillNumber) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Path getSpillFileForWrite(int spillNumber, long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Path getSpillIndexFile(int spillNumber) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Path getInputFile(int mapId) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Path getInputFileForWrite(TaskID mapId, long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        public void removeAll() throws IOException {
            throw new UnsupportedOperationException();
        }
    }

    private class EventHandler
    implements Runnable {
        private volatile boolean doneWithMaps = false;
        private volatile int finishedSubMaps = 0;
        private final Map<TaskAttemptId, Future<?>> futures = new ConcurrentHashMap();

        EventHandler() {
        }

        @Override
        public void run() {
            ContainerLauncherEvent event = null;
            final HashMap localMapFiles = new HashMap();
            while (!Thread.currentThread().isInterrupted()) {
                Future<?> future;
                try {
                    event = (ContainerLauncherEvent)((Object)LocalContainerLauncher.this.eventQueue.take());
                }
                catch (InterruptedException e) {
                    LOG.error((Object)("Returning, interrupted : " + e));
                    break;
                }
                LOG.info((Object)("Processing the event " + event.toString()));
                if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
                    final ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent)event;
                    future = LocalContainerLauncher.this.taskRunner.submit(new Runnable(){

                        @Override
                        public void run() {
                            EventHandler.this.runTask(launchEv, localMapFiles);
                        }
                    });
                    this.futures.put(event.getTaskAttemptID(), future);
                    continue;
                }
                if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
                    TaskAttemptId taId = event.getTaskAttemptID();
                    future = this.futures.remove(taId);
                    if (future != null) {
                        LOG.info((Object)("canceling the task attempt " + taId));
                        future.cancel(true);
                    }
                    LocalContainerLauncher.this.context.getEventHandler().handle((Event)new TaskAttemptEvent(taId, TaskAttemptEventType.TA_CONTAINER_CLEANED));
                    continue;
                }
                LOG.warn((Object)("Ignoring unexpected event " + event.toString()));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void runTask(ContainerRemoteLaunchEvent launchEv, Map<TaskAttemptID, MapOutputFile> localMapFiles) {
            TaskAttemptId attemptID = launchEv.getTaskAttemptID();
            Job job = LocalContainerLauncher.this.context.getAllJobs().get(attemptID.getTaskId().getJobId());
            int numMapTasks = job.getTotalMaps();
            int numReduceTasks = job.getTotalReduces();
            org.apache.hadoop.mapreduce.v2.app.job.Task ytask = job.getTask(attemptID.getTaskId());
            Task remoteTask = launchEv.getRemoteTask();
            LocalContainerLauncher.this.context.getEventHandler().handle((Event)new TaskAttemptContainerLaunchedEvent(attemptID, -1));
            if (numMapTasks == 0) {
                this.doneWithMaps = true;
            }
            try {
                if (remoteTask.isMapOrReduce()) {
                    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
                    jce.addCounterUpdate((Enum<?>)JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1L);
                    if (remoteTask.isMapTask()) {
                        jce.addCounterUpdate((Enum<?>)JobCounter.NUM_UBER_SUBMAPS, 1L);
                    } else {
                        jce.addCounterUpdate((Enum<?>)JobCounter.NUM_UBER_SUBREDUCES, 1L);
                    }
                    LocalContainerLauncher.this.context.getEventHandler().handle((Event)jce);
                }
                this.runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, numReduceTasks > 0, localMapFiles);
            }
            catch (RuntimeException re) {
                JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
                jce.addCounterUpdate((Enum<?>)JobCounter.NUM_FAILED_UBERTASKS, 1L);
                LocalContainerLauncher.this.context.getEventHandler().handle((Event)jce);
                LocalContainerLauncher.this.context.getEventHandler().handle((Event)new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_CONTAINER_COMPLETED));
            }
            catch (IOException ioe) {
                LOG.fatal((Object)("oopsie...  this can never happen: " + StringUtils.stringifyException((Throwable)ioe)));
                ExitUtil.terminate((int)-1);
            }
            finally {
                if (this.futures.remove(attemptID) != null) {
                    LOG.info((Object)("removed attempt " + attemptID + " from the futures to keep track of"));
                }
            }
        }

        private void runSubtask(Task task, TaskType taskType, TaskAttemptId attemptID, int numMapTasks, boolean renameOutputs, Map<TaskAttemptID, MapOutputFile> localMapFiles) throws RuntimeException, IOException {
            TaskAttemptID classicAttemptID = TypeConverter.fromYarn((TaskAttemptId)attemptID);
            try {
                JobConf conf = new JobConf(LocalContainerLauncher.this.getConfig());
                conf.set("mapreduce.task.id", task.getTaskID().toString());
                conf.set("mapreduce.task.attempt.id", classicAttemptID.toString());
                conf.setBoolean("mapreduce.task.ismap", taskType == TaskType.MAP);
                conf.setInt("mapreduce.task.partition", task.getPartition());
                conf.set("mapreduce.job.id", task.getJobID().toString());
                String[] localSysDirs = StringUtils.getTrimmedStrings((String)System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()));
                conf.setStrings("mapreduce.cluster.local.dir", localSysDirs);
                LOG.info((Object)("mapreduce.cluster.local.dir for uber task: " + conf.get("mapreduce.cluster.local.dir")));
                conf.setBoolean("mapreduce.task.uberized", true);
                if (taskType == TaskType.MAP) {
                    if (this.doneWithMaps) {
                        LOG.error((Object)("CONTAINER_REMOTE_LAUNCH contains a map task (" + attemptID + "), but should be finished with maps"));
                        throw new RuntimeException();
                    }
                    MapTask map = (MapTask)task;
                    map.setConf((Configuration)conf);
                    map.run(conf, LocalContainerLauncher.this.umbilical);
                    if (renameOutputs) {
                        MapOutputFile renamed = this.renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile());
                        localMapFiles.put(classicAttemptID, renamed);
                    }
                    this.relocalize();
                    if (++this.finishedSubMaps == numMapTasks) {
                        this.doneWithMaps = true;
                    }
                } else {
                    if (!this.doneWithMaps) {
                        LOG.error((Object)("CONTAINER_REMOTE_LAUNCH contains a reduce task (" + attemptID + "), but not yet finished with maps"));
                        throw new RuntimeException();
                    }
                    conf.set("mapreduce.framework.name", "local");
                    conf.set("mapreduce.jobtracker.address", "local");
                    ReduceTask reduce = (ReduceTask)task;
                    reduce.setLocalMapFiles(localMapFiles);
                    reduce.setConf((Configuration)conf);
                    reduce.run(conf, LocalContainerLauncher.this.umbilical);
                    this.relocalize();
                }
            }
            catch (FSError e) {
                LOG.fatal((Object)"FSError from child", (Throwable)e);
                LocalContainerLauncher.this.umbilical.fsError(classicAttemptID, e.getMessage());
                throw new RuntimeException();
            }
            catch (Exception exception) {
                LOG.warn((Object)("Exception running local (uberized) 'child' : " + StringUtils.stringifyException((Throwable)exception)));
                try {
                    if (task != null) {
                        task.taskCleanup(LocalContainerLauncher.this.umbilical);
                    }
                }
                catch (Exception e) {
                    LOG.info((Object)("Exception cleaning up: " + StringUtils.stringifyException((Throwable)e)));
                }
                LocalContainerLauncher.this.umbilical.reportDiagnosticInfo(classicAttemptID, StringUtils.stringifyException((Throwable)exception));
                throw new RuntimeException();
            }
            catch (Throwable throwable) {
                LOG.fatal((Object)("Error running local (uberized) 'child' : " + StringUtils.stringifyException((Throwable)throwable)));
                Throwable tCause = throwable.getCause();
                String cause = tCause == null ? throwable.getMessage() : StringUtils.stringifyException((Throwable)tCause);
                LocalContainerLauncher.this.umbilical.fatalError(classicAttemptID, cause);
                throw new RuntimeException();
            }
        }

        private MapOutputFile renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
            LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
            Path mapOut = subMapOutputFile.getOutputFile();
            FileStatus mStatus = localFs.getFileStatus(mapOut);
            Path reduceIn = subMapOutputFile.getInputFileForWrite((TaskID)TypeConverter.fromYarn((TaskAttemptId)mapId).getTaskID(), mStatus.getLen());
            Path mapOutIndex = new Path(mapOut.toString() + ".index");
            Path reduceInIndex = new Path(reduceIn.toString() + ".index");
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Renaming map output file for task attempt " + mapId.toString() + " from original location " + mapOut.toString() + " to destination " + reduceIn.toString()));
            }
            if (!localFs.mkdirs(reduceIn.getParent())) {
                throw new IOException("Mkdirs failed to create " + reduceIn.getParent().toString());
            }
            if (!localFs.rename(mapOut, reduceIn)) {
                throw new IOException("Couldn't rename " + mapOut);
            }
            if (!localFs.rename(mapOutIndex, reduceInIndex)) {
                throw new IOException("Couldn't rename " + mapOutIndex);
            }
            return new RenamedMapOutputFile(reduceIn);
        }

        private void relocalize() {
            File[] curLocalFiles = curDir.listFiles();
            for (int j = 0; j < curLocalFiles.length; ++j) {
                if (LocalContainerLauncher.this.localizedFiles.contains(curLocalFiles[j])) continue;
                boolean deleted = false;
                try {
                    if (LocalContainerLauncher.this.curFC != null) {
                        deleted = LocalContainerLauncher.this.curFC.delete(new Path(curLocalFiles[j].getName()), true);
                    }
                }
                catch (IOException e) {
                    deleted = false;
                }
                if (deleted) continue;
                LOG.warn((Object)("Unable to delete unexpected local file/dir " + curLocalFiles[j].getName() + ": insufficient permissions?"));
            }
        }
    }
}

