/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.File;
import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.HardwareDescriptionFactory;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.ChannelManager;
import org.apache.flink.runtime.io.network.InsufficientResourcesException;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.apache.flink.runtime.taskmanager.AbstractTaskResult;
import org.apache.flink.runtime.taskmanager.ExecutorThreadFactory;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskCancelResult;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskInputSplitProvider;
import org.apache.flink.runtime.taskmanager.TaskKillResult;
import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.util.StringUtils;

public class TaskManager
implements TaskOperationProtocol {
    private static final Log LOG = LogFactory.getLog(TaskManager.class);
    private static final int FAILURE_RETURN_CODE = -1;
    private static final int IPC_HANDLER_COUNT = 1;
    public static final String ARG_CONF_DIR = "tempDir";
    private final JobManagerProtocol jobManager;
    private final InputSplitProviderProtocol globalInputSplitProvider;
    private final ChannelLookupProtocol lookupService;
    private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
    private final AccumulatorProtocol accumulatorProtocolProxy;
    private final Server taskManagerServer;
    private final FileCache fileCache = new FileCache();
    private final Map<ExecutionVertexID, Task> runningTasks = new ConcurrentHashMap<ExecutionVertexID, Task>();
    private final InstanceConnectionInfo localInstanceConnectionInfo;
    private final ChannelManager channelManager;
    private final TaskManagerProfiler profiler;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final HardwareDescription hardwareDescription;
    private final int numberOfSlots;
    private final Thread heartbeatThread;
    private final AtomicBoolean shutdownStarted = new AtomicBoolean(false);
    private volatile boolean shutdownComplete;

    public TaskManager(ExecutionMode executionMode) throws Exception {
        long memorySize;
        InetAddress taskManagerAddress;
        InetSocketAddress jobManagerAddress;
        if (executionMode == null) {
            throw new NullPointerException("Execution mode must not be null.");
        }
        LOG.info((Object)("Execution mode: " + (Object)((Object)executionMode)));
        LOG.info((Object)"Reading location of job manager from configuration");
        String address = GlobalConfiguration.getString((String)"jobmanager.rpc.address", null);
        int port = GlobalConfiguration.getInteger((String)"jobmanager.rpc.port", (int)6123);
        if (address == null) {
            throw new Exception("Job manager address not configured in the GlobalConfiguration.");
        }
        try {
            InetAddress tmpAddress = InetAddress.getByName(address);
            jobManagerAddress = new InetSocketAddress(tmpAddress, port);
        }
        catch (UnknownHostException e) {
            LOG.fatal((Object)"Could not resolve JobManager host name.");
            throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
        }
        LOG.info((Object)("Connecting to JobManager at: " + jobManagerAddress));
        try {
            this.jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
        }
        catch (IOException e) {
            LOG.fatal((Object)("Could not connect to the JobManager: " + e.getMessage()), (Throwable)e);
            throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
        }
        int ipcPort = GlobalConfiguration.getInteger((String)"taskmanager.rpc.port", (int)-1);
        int dataPort = GlobalConfiguration.getInteger((String)"taskmanager.data.port", (int)-1);
        if (ipcPort == -1) {
            ipcPort = this.getAvailablePort();
        }
        if (dataPort == -1) {
            dataPort = this.getAvailablePort();
        }
        try {
            taskManagerAddress = this.getTaskManagerAddress(jobManagerAddress);
        }
        catch (Exception e) {
            throw new RuntimeException("The TaskManager failed to determine its own network address.", e);
        }
        this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerAddress, ipcPort, dataPort);
        LOG.info((Object)("TaskManager connection information:" + this.localInstanceConnectionInfo));
        try {
            this.taskManagerServer = RPC.getServer(this, taskManagerAddress.getHostAddress(), ipcPort, 1);
            this.taskManagerServer.start();
        }
        catch (IOException e) {
            LOG.fatal((Object)("Failed to start TaskManager server. " + e.getMessage()), (Throwable)e);
            throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
        }
        try {
            this.globalInputSplitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
        }
        catch (IOException e) {
            LOG.fatal((Object)e.getMessage(), (Throwable)e);
            throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e);
        }
        try {
            this.lookupService = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
        }
        catch (IOException e) {
            LOG.fatal((Object)e.getMessage(), (Throwable)e);
            throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
        }
        try {
            this.accumulatorProtocolProxy = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
        }
        catch (IOException e) {
            LOG.fatal((Object)("Failed to initialize accumulator protocol: " + e.getMessage()), (Throwable)e);
            throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
        }
        if (GlobalConfiguration.getBoolean((String)"jobmanager.profiling.enable", (boolean)false)) {
            String profilerClassName = GlobalConfiguration.getString((String)"taskmanager.profiling.classname", (String)"org.apache.flink.runtime.profiling.impl.TaskManagerProfilerImpl");
            this.profiler = ProfilingUtils.loadTaskManagerProfiler(profilerClassName, jobManagerAddress.getAddress(), this.localInstanceConnectionInfo);
            if (this.profiler == null) {
                LOG.error((Object)"Cannot find class name for the profiler.");
            } else {
                LOG.info((Object)"Profiling of jobs is enabled.");
            }
        } else {
            this.profiler = null;
            LOG.info((Object)"Profiling of jobs is disabled.");
        }
        String[] tmpDirPaths = GlobalConfiguration.getString((String)"taskmanager.tmp.dirs", (String)ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
        TaskManager.checkTempDirs(tmpDirPaths);
        int numBuffers = GlobalConfiguration.getInteger((String)"taskmanager.network.numberOfBuffers", (int)2048);
        int bufferSize = GlobalConfiguration.getInteger((String)"taskmanager.network.bufferSizeInBytes", (int)32768);
        try {
            NetworkConnectionManager networkConnectionManager = null;
            switch (executionMode) {
                case LOCAL: {
                    networkConnectionManager = new LocalConnectionManager();
                    break;
                }
                case CLUSTER: {
                    int numInThreads = GlobalConfiguration.getInteger((String)"taskmanager.net.numInThreads", (int)-1);
                    int numOutThreads = GlobalConfiguration.getInteger((String)"taskmanager.net.numOutThreads", (int)-1);
                    int closeAfterIdleForMs = GlobalConfiguration.getInteger((String)"taskmanager.net.closeAfterIdleForMs", (int)10000);
                    networkConnectionManager = new NettyConnectionManager(this.localInstanceConnectionInfo.address(), this.localInstanceConnectionInfo.dataPort(), bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs);
                }
            }
            this.channelManager = new ChannelManager(this.lookupService, this.localInstanceConnectionInfo, numBuffers, bufferSize, networkConnectionManager);
        }
        catch (IOException ioe) {
            LOG.error((Object)StringUtils.stringifyException((Throwable)ioe));
            throw new Exception("Failed to instantiate ChannelManager.", ioe);
        }
        int slots = GlobalConfiguration.getInteger((String)"taskmanager.numberOfTaskSlots", (int)-1);
        if (slots == -1) {
            slots = 1;
            LOG.info((Object)"Number of task slots not configured. Creating one task slot.");
        } else {
            if (slots <= 0) {
                throw new Exception("Illegal value for the number of task slots: " + slots);
            }
            LOG.info((Object)("Creating " + slots + " task slot(s)."));
        }
        this.numberOfSlots = slots;
        this.hardwareDescription = HardwareDescriptionFactory.extractFromSystem();
        long configuredMemorySize = GlobalConfiguration.getInteger((String)"taskmanager.memory.size", (int)-1);
        if (configuredMemorySize == -1L) {
            float fraction = GlobalConfiguration.getFloat((String)"taskmanager.memory.fraction", (float)0.7f);
            memorySize = (long)((float)this.hardwareDescription.getSizeOfFreeMemory() * fraction);
            LOG.info((Object)("Using " + fraction + " of the free heap space for managed memory."));
        } else {
            if (configuredMemorySize <= 0L) {
                throw new Exception("Invalid value for Memory Manager memory size: " + configuredMemorySize);
            }
            memorySize = configuredMemorySize << 20;
        }
        int pageSize = GlobalConfiguration.getInteger((String)"taskmanager.network.bufferSizeInBytes", (int)32768);
        LOG.info((Object)("Initializing memory manager with " + (memorySize >>> 20) + " megabytes of memory. " + "Page size is " + pageSize + " bytes."));
        try {
            boolean lazyAllocation = GlobalConfiguration.getBoolean((String)"taskmanager.memory.lazyalloc", (boolean)false);
            this.memoryManager = new DefaultMemoryManager(memorySize, this.numberOfSlots, pageSize);
        }
        catch (Throwable t) {
            LOG.fatal((Object)("Unable to initialize memory manager with " + (memorySize >>> 20) + " megabytes of memory."), t);
            throw new Exception("Unable to initialize memory manager.", t);
        }
        this.ioManager = new IOManager(tmpDirPaths);
        this.heartbeatThread = new Thread(){

            @Override
            public void run() {
                TaskManager.this.runHeartbeatLoop();
            }
        };
        this.heartbeatThread.setName("Heartbeat Thread");
        this.heartbeatThread.start();
        final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
        final List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
        LOG.info((Object)this.getMemoryUsageStatsAsString(memoryMXBean));
        boolean startMemoryUsageLogThread = GlobalConfiguration.getBoolean((String)"taskmanager.debug.memory.startLogThread", (boolean)false);
        if (startMemoryUsageLogThread && LOG.isDebugEnabled()) {
            final int logIntervalMs = GlobalConfiguration.getInteger((String)"taskmanager.debug.memory.logIntervalMs", (int)5000);
            new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        while (!TaskManager.this.isShutDown()) {
                            Thread.sleep(logIntervalMs);
                            LOG.debug((Object)TaskManager.this.getMemoryUsageStatsAsString(memoryMXBean));
                            LOG.debug((Object)TaskManager.this.getGarbageCollectorStatsAsString(gcMXBeans));
                        }
                    }
                    catch (InterruptedException e) {
                        LOG.warn((Object)"Unexpected interruption of memory usage logger thread.");
                    }
                }
            }).start();
        }
    }

    private int getAvailablePort() {
        ServerSocket serverSocket = null;
        int port = 0;
        for (int i = 0; i < 50; ++i) {
            try {
                serverSocket = new ServerSocket(0);
                port = serverSocket.getLocalPort();
                if (port == 0) continue;
                serverSocket.close();
                break;
            }
            catch (IOException e) {
                LOG.debug((Object)("Unable to allocate port " + e.getMessage()), (Throwable)e);
            }
        }
        if (!serverSocket.isClosed()) {
            try {
                serverSocket.close();
            }
            catch (IOException e) {
                LOG.debug((Object)"error closing port", (Throwable)e);
            }
        }
        return port;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws IOException {
        Object mon;
        OptionBuilder.withArgName((String)"config directory");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription((String)"Specify configuration directory.");
        Option configDirOpt = OptionBuilder.create((String)"configDir");
        OptionBuilder.withArgName((String)"temporary directory (overwrites configured option)");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription((String)"Specify temporary directory.");
        Option tempDir = OptionBuilder.create((String)ARG_CONF_DIR);
        configDirOpt.setRequired(true);
        tempDir.setRequired(false);
        Options options = new Options();
        options.addOption(configDirOpt);
        options.addOption(tempDir);
        GnuParser parser = new GnuParser();
        CommandLine line = null;
        try {
            line = parser.parse(options, args);
        }
        catch (ParseException e) {
            System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
            System.exit(-1);
        }
        String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
        String tempDirVal = line.getOptionValue(tempDir.getOpt(), null);
        GlobalConfiguration.loadConfiguration((String)configDir);
        if (tempDirVal != null && GlobalConfiguration.getString((String)"taskmanager.tmp.dirs", null) == null) {
            Configuration c = GlobalConfiguration.getConfiguration();
            c.setString("taskmanager.tmp.dirs", tempDirVal);
            LOG.info((Object)("Setting temporary directory to " + tempDirVal));
            GlobalConfiguration.includeConfiguration((Configuration)c);
        }
        EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
        try {
            new TaskManager(ExecutionMode.CLUSTER);
        }
        catch (Exception e) {
            LOG.fatal((Object)("Taskmanager startup failed: " + e.getMessage()), (Throwable)e);
            System.exit(-1);
        }
        Object object = mon = new Object();
        synchronized (object) {
            try {
                mon.wait();
            }
            catch (InterruptedException ex) {
                // empty catch block
            }
        }
    }

    private void runHeartbeatLoop() {
        long interval = GlobalConfiguration.getInteger((String)"taskmanager.heartbeat-interval", (int)2000);
        block8: while (true) {
            try {
                RegisterTaskManagerResult result;
                while (!this.shutdownStarted.get() && (result = this.jobManager.registerTaskManager(this.localInstanceConnectionInfo, this.hardwareDescription, new IntegerRecord(this.numberOfSlots))).getReturnCode() != RegisterTaskManagerResult.ReturnCode.SUCCESS) {
                    try {
                        Thread.sleep(50L);
                        continue block8;
                    }
                    catch (InterruptedException e) {
                        if (this.shutdownStarted.get()) continue;
                        LOG.error((Object)"TaskManager register task manager loop was interrupted without shutdown.");
                    }
                }
                break;
            }
            catch (IOException e) {
                if (!this.shutdownStarted.get()) {
                    LOG.error((Object)("Registering task manager caused an exception: " + e.getMessage()), (Throwable)e);
                }
                return;
            }
        }
        while (!this.shutdownStarted.get()) {
            block12: {
                try {
                    Thread.sleep(interval);
                }
                catch (InterruptedException e) {
                    if (this.shutdownStarted.get()) break block12;
                    LOG.error((Object)"TaskManager heart beat loop was interrupted without shutdown.");
                }
            }
            try {
                this.jobManager.sendHeartbeat(this.localInstanceConnectionInfo);
            }
            catch (IOException e) {
                if (this.shutdownStarted.get()) break;
                LOG.error((Object)("Sending the heart beat caused an exception: " + e.getMessage()), (Throwable)e);
            }
        }
    }

    private InetAddress getTaskManagerAddress(InetSocketAddress jobManagerAddress) throws IOException {
        AddressDetectionState strategy = AddressDetectionState.ADDRESS;
        while (true) {
            Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
            while (e.hasMoreElements()) {
                NetworkInterface n = e.nextElement();
                Enumeration<InetAddress> ee = n.getInetAddresses();
                block11: while (ee.hasMoreElements()) {
                    InetAddress i = ee.nextElement();
                    switch (strategy) {
                        case ADDRESS: {
                            if (!TaskManager.hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress()) || !TaskManager.tryToConnect(i, jobManagerAddress, strategy.getTimeout())) continue block11;
                            LOG.info((Object)("Determined " + i + " as the TaskTracker's own IP address"));
                            return i;
                        }
                        case FAST_CONNECT: 
                        case SLOW_CONNECT: {
                            boolean correct = TaskManager.tryToConnect(i, jobManagerAddress, strategy.getTimeout());
                            if (!correct) continue block11;
                            LOG.info((Object)("Determined " + i + " as the TaskTracker's own IP address"));
                            return i;
                        }
                    }
                    throw new RuntimeException("Unkown address detection strategy: " + (Object)((Object)strategy));
                }
            }
            switch (strategy) {
                case ADDRESS: {
                    strategy = AddressDetectionState.FAST_CONNECT;
                    break;
                }
                case FAST_CONNECT: {
                    strategy = AddressDetectionState.SLOW_CONNECT;
                    break;
                }
                case SLOW_CONNECT: {
                    throw new RuntimeException("The TaskManager failed to detect its own IP address");
                }
            }
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug((Object)("Defaulting to detection strategy " + (Object)((Object)strategy)));
        }
    }

    private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
        return address[0] == address2[0] && address[1] == address2[1];
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress + " with timeout " + timeout));
        }
        boolean connectable = true;
        Socket socket = null;
        try {
            socket = new Socket();
            InetSocketAddress bindP = new InetSocketAddress(fromAddress, 0);
            socket.bind(bindP);
            socket.connect(toSocket, timeout);
        }
        catch (Exception ex) {
            LOG.info((Object)("Failed to determine own IP address from '" + fromAddress + "': " + ex.getMessage()));
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"Failed with exception", (Throwable)ex);
            }
            connectable = false;
        }
        finally {
            if (socket != null) {
                socket.close();
            }
        }
        return connectable;
    }

    @Override
    public TaskCancelResult cancelTask(ExecutionVertexID id) throws IOException {
        final Task task = this.runningTasks.get(id);
        if (task == null) {
            TaskCancelResult taskCancelResult = new TaskCancelResult(id, AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
            taskCancelResult.setDescription("No task with ID " + id + " is currently running");
            return taskCancelResult;
        }
        Runnable r = new Runnable(){

            @Override
            public void run() {
                task.cancelExecution();
            }
        };
        this.executorService.execute(r);
        return new TaskCancelResult(id, AbstractTaskResult.ReturnCode.SUCCESS);
    }

    @Override
    public TaskKillResult killTask(ExecutionVertexID id) throws IOException {
        final Task task = this.runningTasks.get(id);
        if (task == null) {
            TaskKillResult taskKillResult = new TaskKillResult(id, AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
            taskKillResult.setDescription("No task with ID + " + id + " is currently running");
            return taskKillResult;
        }
        Runnable r = new Runnable(){

            @Override
            public void run() {
                task.killExecution();
            }
        };
        this.executorService.execute(r);
        return new TaskKillResult(id, AbstractTaskResult.ReturnCode.SUCCESS);
    }

    @Override
    public List<TaskSubmissionResult> submitTasks(List<TaskDeploymentDescriptor> tasks) throws IOException {
        SerializableArrayList<TaskSubmissionResult> submissionResultList = new SerializableArrayList<TaskSubmissionResult>();
        ArrayList<Task> tasksToStart = new ArrayList<Task>();
        for (TaskDeploymentDescriptor tdd : tasks) {
            Task task;
            RuntimeEnvironment re;
            JobID jobID = tdd.getJobID();
            ExecutionVertexID vertexID = tdd.getVertexID();
            HashMap<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
            for (Map.Entry e : DistributedCache.readFileInfoFromConfig((Configuration)tdd.getJobConfiguration())) {
                FutureTask<Path> cp = this.fileCache.createTmpFile((String)e.getKey(), (DistributedCache.DistributedCacheEntry)e.getValue(), jobID);
                cpTasks.put((String)e.getKey(), cp);
            }
            try {
                re = new RuntimeEnvironment(tdd, this.memoryManager, this.ioManager, new TaskInputSplitProvider(jobID, vertexID, this.globalInputSplitProvider), this.accumulatorProtocolProxy, cpTasks);
            }
            catch (Throwable t) {
                TaskSubmissionResult result = new TaskSubmissionResult(vertexID, AbstractTaskResult.ReturnCode.DEPLOYMENT_ERROR);
                result.setDescription(StringUtils.stringifyException((Throwable)t));
                LOG.error((Object)result.getDescription(), t);
                submissionResultList.add(result);
                continue;
            }
            Configuration jobConfiguration = tdd.getJobConfiguration();
            try {
                task = this.createAndRegisterTask(vertexID, jobConfiguration, re);
            }
            catch (InsufficientResourcesException e) {
                TaskSubmissionResult result = new TaskSubmissionResult(vertexID, AbstractTaskResult.ReturnCode.INSUFFICIENT_RESOURCES);
                result.setDescription(e.getMessage());
                LOG.error((Object)result.getDescription(), (Throwable)e);
                submissionResultList.add(result);
                continue;
            }
            if (task == null) {
                TaskSubmissionResult result = new TaskSubmissionResult(vertexID, AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
                result.setDescription("Task " + re.getTaskNameWithIndex() + " (" + vertexID + ") was already running");
                LOG.error((Object)result.getDescription());
                submissionResultList.add(result);
                continue;
            }
            submissionResultList.add(new TaskSubmissionResult(vertexID, AbstractTaskResult.ReturnCode.SUCCESS));
            tasksToStart.add(task);
        }
        for (Task task : tasksToStart) {
            task.startExecution();
        }
        return submissionResultList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Task createAndRegisterTask(ExecutionVertexID id, Configuration jobConfiguration, RuntimeEnvironment environment) throws InsufficientResourcesException, IOException {
        Task task;
        if (id == null) {
            throw new IllegalArgumentException("Argument id is null");
        }
        if (environment == null) {
            throw new IllegalArgumentException("Argument environment is null");
        }
        TaskManager taskManager = this;
        synchronized (taskManager) {
            Task runningTask = this.runningTasks.get(id);
            boolean registerTask = true;
            if (runningTask == null) {
                task = new Task(id, environment, this);
            } else {
                if (runningTask instanceof Task) {
                    return null;
                }
                task = runningTask;
                registerTask = false;
            }
            if (registerTask) {
                this.channelManager.register(task);
                boolean enableProfiling = false;
                if (this.profiler != null && jobConfiguration.getBoolean("job.profiling.enable", true)) {
                    enableProfiling = true;
                }
                if (enableProfiling) {
                    task.registerProfiler(this.profiler, jobConfiguration);
                }
                this.runningTasks.put(id, task);
            }
        }
        return task;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unregisterTask(ExecutionVertexID id) {
        TaskManager taskManager = this;
        synchronized (taskManager) {
            block7: {
                Task task = this.runningTasks.remove(id);
                if (task == null) {
                    LOG.error((Object)("Cannot find task with ID " + id + " to unregister"));
                    return;
                }
                for (Map.Entry e : DistributedCache.readFileInfoFromConfig((Configuration)task.getEnvironment().getJobConfiguration())) {
                    this.fileCache.deleteTmpFile((String)e.getKey(), (DistributedCache.DistributedCacheEntry)e.getValue(), task.getJobID());
                }
                this.channelManager.unregister(id, task);
                task.unregisterProfiler(this.profiler);
                task.unregisterMemoryManager(this.memoryManager);
                try {
                    LibraryCacheManager.unregister(task.getJobID());
                }
                catch (IOException e) {
                    if (!LOG.isDebugEnabled()) break block7;
                    LOG.debug((Object)("Unregistering the job vertex ID " + id + " caused an IOException"));
                }
            }
        }
    }

    @Override
    public LibraryCacheProfileResponse getLibraryCacheProfile(LibraryCacheProfileRequest request) throws IOException {
        LibraryCacheProfileResponse response = new LibraryCacheProfileResponse(request);
        String[] requiredLibraries = request.getRequiredLibraries();
        for (int i = 0; i < requiredLibraries.length; ++i) {
            if (LibraryCacheManager.contains(requiredLibraries[i]) == null) {
                response.setCached(i, false);
                continue;
            }
            response.setCached(i, true);
        }
        return response;
    }

    @Override
    public void updateLibraryCache(LibraryCacheUpdate update) throws IOException {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void executionStateChanged(JobID jobID, ExecutionVertexID id, ExecutionState newExecutionState, String optionalDescription) {
        if (newExecutionState == ExecutionState.CANCELING) {
            return;
        }
        if (newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED || newExecutionState == ExecutionState.FAILED) {
            this.unregisterTask(id);
        }
        JobManagerProtocol jobManagerProtocol = this.jobManager;
        synchronized (jobManagerProtocol) {
            try {
                this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, id, newExecutionState, optionalDescription));
            }
            catch (IOException e) {
                LOG.error((Object)e);
            }
        }
    }

    public void shutdown() {
        block11: {
            if (!this.shutdownStarted.compareAndSet(false, true)) {
                return;
            }
            LOG.info((Object)"Shutting down TaskManager");
            this.heartbeatThread.interrupt();
            try {
                this.heartbeatThread.join(1000L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            RPC.stopProxy(this.jobManager);
            RPC.stopProxy(this.globalInputSplitProvider);
            RPC.stopProxy(this.lookupService);
            RPC.stopProxy(this.accumulatorProtocolProxy);
            this.taskManagerServer.stop();
            if (this.profiler != null) {
                this.profiler.shutdown();
            }
            try {
                this.channelManager.shutdown();
            }
            catch (IOException e) {
                LOG.warn((Object)("ChannelManager did not shutdown properly: " + e.getMessage()), (Throwable)e);
            }
            if (this.ioManager != null) {
                this.ioManager.shutdown();
            }
            if (this.memoryManager != null) {
                this.memoryManager.shutdown();
            }
            this.fileCache.shutdown();
            if (this.executorService != null) {
                this.executorService.shutdown();
                try {
                    this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    if (!LOG.isDebugEnabled()) break block11;
                    LOG.debug((Object)e);
                }
            }
        }
        this.shutdownComplete = true;
    }

    public boolean isShutDown() {
        return this.shutdownComplete;
    }

    @Override
    public void logBufferUtilization() {
        this.channelManager.logBufferUtilization();
    }

    @Override
    public void killTaskManager() throws IOException {
        Timer timer = new Timer();
        TimerTask timerTask = new TimerTask(){

            @Override
            public void run() {
                System.exit(0);
            }
        };
        timer.schedule(timerTask, 10L);
    }

    @Override
    public void invalidateLookupCacheEntries(Set<ChannelID> channelIDs) throws IOException {
        this.channelManager.invalidateLookupCacheEntries(channelIDs);
    }

    private static final void checkTempDirs(String[] tempDirs) throws Exception {
        for (int i = 0; i < tempDirs.length; ++i) {
            String dir = tempDirs[i];
            if (dir == null) {
                throw new Exception("Temporary file directory #" + (i + 1) + " is null.");
            }
            File f = new File(dir);
            if (!f.exists()) {
                throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' does not exist.");
            }
            if (!f.isDirectory()) {
                throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not a directory.");
            }
            if (f.canWrite()) continue;
            throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not writable.");
        }
    }

    private String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) {
        MemoryUsage heap = memoryMXBean.getHeapMemoryUsage();
        MemoryUsage nonHeap = memoryMXBean.getNonHeapMemoryUsage();
        int mb = 0x100000;
        int heapUsed = (int)(heap.getUsed() / (long)mb);
        int heapCommitted = (int)(heap.getCommitted() / (long)mb);
        int heapMax = (int)(heap.getMax() / (long)mb);
        int nonHeapUsed = (int)(nonHeap.getUsed() / (long)mb);
        int nonHeapCommitted = (int)(nonHeap.getCommitted() / (long)mb);
        int nonHeapMax = (int)(nonHeap.getMax() / (long)mb);
        String msg = String.format("Memory usage stats: [HEAP: %d/%d/%d MB, NON HEAP: %d/%d/%d MB (used/comitted/max)]", heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax);
        return msg;
    }

    private String getGarbageCollectorStatsAsString(List<GarbageCollectorMXBean> gcMXBeans) {
        StringBuilder str = new StringBuilder();
        str.append("Garbage collector stats: ");
        for (int i = 0; i < gcMXBeans.size(); ++i) {
            GarbageCollectorMXBean bean = gcMXBeans.get(i);
            String msg = String.format("[%s, GC TIME (ms): %d, GC COUNT: %d]", bean.getName(), bean.getCollectionTime(), bean.getCollectionCount());
            str.append(msg);
            str.append(i < gcMXBeans.size() - 1 ? ", " : "");
        }
        return str.toString();
    }

    private static enum AddressDetectionState {
        ADDRESS(50),
        FAST_CONNECT(50),
        SLOW_CONNECT(1000);

        private int timeout;

        private AddressDetectionState(int timeout) {
            this.timeout = timeout;
        }

        public int getTimeout() {
            return this.timeout;
        }
    }
}

