/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.File;
import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.Hardware;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.ChannelManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskInputSplitProvider;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskManager
implements TaskOperationProtocol {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
    private static final int STARTUP_FAILURE_RETURN_CODE = 1;
    private static final int MAX_LOST_HEART_BEATS = 3;
    private static final int DELAY_AFTER_LOST_CONNECTION = 10000;
    public static final String ARG_CONF_DIR = "tempDir";
    private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
    private final InstanceConnectionInfo localInstanceConnectionInfo;
    private final HardwareDescription hardwareDescription;
    private final ExecutionMode executionMode;
    private final JobManagerProtocol jobManager;
    private final InputSplitProviderProtocol globalInputSplitProvider;
    private final ChannelLookupProtocol lookupService;
    private final AccumulatorProtocol accumulatorProtocolProxy;
    private final LibraryCacheManager libraryCacheManager;
    private final Server taskManagerServer;
    private final FileCache fileCache = new FileCache();
    private final ConcurrentHashMap<ExecutionAttemptID, Task> runningTasks = new ConcurrentHashMap();
    private final ChannelManager channelManager;
    private final TaskManagerProfiler profiler;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final int numberOfSlots;
    private final Thread heartbeatThread;
    private final AtomicBoolean shutdownStarted = new AtomicBoolean(false);
    private volatile InstanceID registeredId;
    private volatile boolean shutdownComplete;

    public TaskManager(ExecutionMode executionMode, JobManagerProtocol jobManager, InputSplitProviderProtocol splitProvider, ChannelLookupProtocol channelLookup, AccumulatorProtocol accumulators, InetSocketAddress jobManagerAddress, InetAddress taskManagerBindAddress) throws Exception {
        long memorySize;
        if (executionMode == null || jobManager == null || splitProvider == null || channelLookup == null || accumulators == null) {
            throw new NullPointerException();
        }
        LOG.info("TaskManager execution mode: " + (Object)((Object)executionMode));
        this.executionMode = executionMode;
        this.jobManager = jobManager;
        this.lookupService = channelLookup;
        this.globalInputSplitProvider = splitProvider;
        this.accumulatorProtocolProxy = accumulators;
        int slots = GlobalConfiguration.getInteger((String)"taskmanager.numberOfTaskSlots", (int)-1);
        if (slots == -1) {
            slots = 1;
            LOG.info("Number of task slots not configured. Creating one task slot.");
        } else {
            if (slots <= 0) {
                throw new Exception("Illegal value for the number of task slots: " + slots);
            }
            LOG.info("Creating " + slots + " task slot(s).");
        }
        this.numberOfSlots = slots;
        int ipcPort = GlobalConfiguration.getInteger((String)"taskmanager.rpc.port", (int)-1);
        int dataPort = GlobalConfiguration.getInteger((String)"taskmanager.data.port", (int)-1);
        if (ipcPort == -1) {
            ipcPort = TaskManager.getAvailablePort();
        }
        if (dataPort == -1) {
            dataPort = TaskManager.getAvailablePort();
        }
        this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerBindAddress, ipcPort, dataPort);
        LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
        try {
            int numHandlers = Math.min(this.numberOfSlots, 2 * Hardware.getNumberCPUCores());
            this.taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostAddress(), ipcPort, numHandlers);
            this.taskManagerServer.start();
        }
        catch (IOException e) {
            LOG.error("Failed to start TaskManager server. " + e.getMessage(), (Throwable)e);
            throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
        }
        if (GlobalConfiguration.getBoolean((String)"jobmanager.profiling.enable", (boolean)false)) {
            String profilerClassName = GlobalConfiguration.getString((String)"taskmanager.profiling.classname", (String)"org.apache.flink.runtime.profiling.impl.TaskManagerProfilerImpl");
            this.profiler = ProfilingUtils.loadTaskManagerProfiler(profilerClassName, jobManagerAddress.getAddress(), this.localInstanceConnectionInfo);
            if (this.profiler == null) {
                LOG.error("Cannot find class name for the profiler.");
            } else {
                LOG.info("Profiling of jobs is enabled.");
            }
        } else {
            this.profiler = null;
            LOG.info("Profiling of jobs is disabled.");
        }
        String[] tmpDirPaths = GlobalConfiguration.getString((String)"taskmanager.tmp.dirs", (String)ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
        TaskManager.checkTempDirs(tmpDirPaths);
        int numBuffers = GlobalConfiguration.getInteger((String)"taskmanager.network.numberOfBuffers", (int)2048);
        int bufferSize = GlobalConfiguration.getInteger((String)"taskmanager.network.bufferSizeInBytes", (int)32768);
        try {
            NetworkConnectionManager networkConnectionManager = null;
            switch (executionMode) {
                case LOCAL: {
                    networkConnectionManager = new LocalConnectionManager();
                    break;
                }
                case CLUSTER: {
                    int numInThreads = GlobalConfiguration.getInteger((String)"taskmanager.net.numInThreads", (int)-1);
                    int numOutThreads = GlobalConfiguration.getInteger((String)"taskmanager.net.numOutThreads", (int)-1);
                    int lowWaterMark = GlobalConfiguration.getInteger((String)"taskmanager.net.nettyLowWaterMark", (int)-1);
                    int highWaterMark = GlobalConfiguration.getInteger((String)"taskmanager.net.nettyHighWaterMark", (int)-1);
                    networkConnectionManager = new NettyConnectionManager(this.localInstanceConnectionInfo.address(), this.localInstanceConnectionInfo.dataPort(), bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
                }
            }
            this.channelManager = new ChannelManager(this.lookupService, this.localInstanceConnectionInfo, numBuffers, bufferSize, networkConnectionManager);
        }
        catch (IOException ioe) {
            LOG.error(StringUtils.stringifyException((Throwable)ioe));
            throw new Exception("Failed to instantiate ChannelManager.", ioe);
        }
        long configuredMemorySize = GlobalConfiguration.getInteger((String)"taskmanager.memory.size", (int)-1);
        if (configuredMemorySize == -1L) {
            float fraction = GlobalConfiguration.getFloat((String)"taskmanager.memory.fraction", (float)0.7f);
            memorySize = (long)((float)EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
            LOG.info("Using " + fraction + " of the free heap space for managed memory.");
        } else {
            if (configuredMemorySize <= 0L) {
                throw new Exception("Invalid value for Memory Manager memory size: " + configuredMemorySize);
            }
            memorySize = configuredMemorySize << 20;
        }
        int pageSize = GlobalConfiguration.getInteger((String)"taskmanager.network.bufferSizeInBytes", (int)32768);
        LOG.info("Initializing memory manager with " + (memorySize >>> 20) + " megabytes of memory. " + "Page size is " + pageSize + " bytes.");
        try {
            boolean lazyAllocation = GlobalConfiguration.getBoolean((String)"taskmanager.memory.lazyalloc", (boolean)false);
            this.memoryManager = new DefaultMemoryManager(memorySize, this.numberOfSlots, pageSize);
        }
        catch (Throwable t) {
            LOG.error("Unable to initialize memory manager with " + (memorySize >>> 20) + " megabytes of memory.", t);
            throw new Exception("Unable to initialize memory manager.", t);
        }
        this.hardwareDescription = HardwareDescription.extractFromSystem(this.memoryManager.getMemorySize());
        int blobPort = this.jobManager.getBlobServerPort();
        if (blobPort == -1) {
            LOG.warn("Unable to determine BLOB server address: User library download will not be available");
            this.libraryCacheManager = new FallbackLibraryCacheManager();
        } else {
            InetSocketAddress blobServerAddress = new InetSocketAddress(jobManagerAddress.getAddress(), blobPort);
            LOG.info("Determined BLOB server address to be " + blobServerAddress);
            this.libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(blobServerAddress), GlobalConfiguration.getConfiguration());
        }
        this.ioManager = new IOManager(tmpDirPaths);
        final long interval = GlobalConfiguration.getInteger((String)"taskmanager.heartbeat-interval", (int)5000);
        this.heartbeatThread = new Thread(){

            @Override
            public void run() {
                TaskManager.this.registerAndRunHeartbeatLoop(interval, 3);
            }
        };
        this.heartbeatThread.setName("Heartbeat Thread");
        this.heartbeatThread.start();
        final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
        final List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
        LOG.info(this.getMemoryUsageStatsAsString(memoryMXBean));
        boolean startMemoryUsageLogThread = GlobalConfiguration.getBoolean((String)"taskmanager.debug.memory.startLogThread", (boolean)false);
        if (startMemoryUsageLogThread && LOG.isDebugEnabled()) {
            final int logIntervalMs = GlobalConfiguration.getInteger((String)"taskmanager.debug.memory.logIntervalMs", (int)5000);
            new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        while (!TaskManager.this.isShutDown()) {
                            Thread.sleep(logIntervalMs);
                            if (!LOG.isDebugEnabled()) continue;
                            LOG.debug(TaskManager.this.getMemoryUsageStatsAsString(memoryMXBean));
                            LOG.debug(TaskManager.this.getGarbageCollectorStatsAsString(gcMXBeans));
                        }
                    }
                    catch (InterruptedException e) {
                        LOG.warn("Unexpected interruption of memory usage logger thread.");
                    }
                }
            }).start();
        }
    }

    public void shutdown() {
        if (!this.shutdownStarted.compareAndSet(false, true)) {
            return;
        }
        LOG.info("Shutting down TaskManager");
        this.cancelAndClearEverything(new Exception("Task Manager is shutting down"));
        this.heartbeatThread.interrupt();
        try {
            this.heartbeatThread.join(1000L);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
        TaskManager.stopProxy(this.jobManager);
        TaskManager.stopProxy(this.globalInputSplitProvider);
        TaskManager.stopProxy(this.lookupService);
        TaskManager.stopProxy(this.accumulatorProtocolProxy);
        try {
            this.taskManagerServer.stop();
        }
        catch (Throwable t) {
            LOG.warn("TaskManager RPC server did not shut down properly.", t);
        }
        if (this.profiler != null) {
            this.profiler.shutdown();
        }
        try {
            this.channelManager.shutdown();
        }
        catch (Throwable t) {
            LOG.warn("ChannelManager did not shutdown properly: " + t.getMessage(), t);
        }
        if (this.ioManager != null) {
            this.ioManager.shutdown();
        }
        if (this.memoryManager != null) {
            this.memoryManager.shutdown();
        }
        if (this.libraryCacheManager != null) {
            try {
                this.libraryCacheManager.shutdown();
            }
            catch (IOException e) {
                LOG.warn("Could not properly shutdown the library cache manager.", (Throwable)e);
            }
        }
        this.fileCache.shutdown();
        if (this.executorService != null) {
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOG.debug("Shutdown of executor thread pool interrupted", (Throwable)e);
            }
        }
        this.shutdownComplete = true;
    }

    public boolean isShutDown() {
        return this.shutdownComplete;
    }

    public InstanceConnectionInfo getConnectionInfo() {
        return this.localInstanceConnectionInfo;
    }

    public ExecutionMode getExecutionMode() {
        return this.executionMode;
    }

    public InstanceID getRegisteredId() {
        return this.registeredId;
    }

    public boolean isRegistered() {
        return this.registeredId != null;
    }

    public Map<ExecutionAttemptID, Task> getAllRunningTasks() {
        return Collections.unmodifiableMap(this.runningTasks);
    }

    public ChannelManager getChannelManager() {
        return this.channelManager;
    }

    @Override
    public TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException {
        final Task task = this.runningTasks.get(executionId);
        if (task == null) {
            return new TaskOperationResult(executionId, false, "No task with that execution ID was found.");
        }
        Runnable r = new Runnable(){

            @Override
            public void run() {
                task.cancelExecution();
            }
        };
        this.executorService.execute(r);
        return new TaskOperationResult(executionId, true);
    }

    @Override
    public TaskOperationResult submitTask(TaskDeploymentDescriptor tdd) {
        JobID jobID = tdd.getJobID();
        JobVertexID vertexId = tdd.getVertexID();
        ExecutionAttemptID executionId = tdd.getExecutionId();
        int taskIndex = tdd.getIndexInSubtaskGroup();
        int numSubtasks = tdd.getCurrentNumberOfSubtasks();
        Task task = null;
        boolean jarsRegistered = false;
        try {
            boolean enableProfiling;
            this.libraryCacheManager.register(jobID, tdd.getRequiredJarFiles());
            jarsRegistered = true;
            ClassLoader userCodeClassLoader = this.libraryCacheManager.getClassLoader(jobID);
            if (userCodeClassLoader == null) {
                throw new Exception("No user code ClassLoader available.");
            }
            task = new Task(jobID, vertexId, taskIndex, numSubtasks, executionId, tdd.getTaskName(), this);
            if (this.runningTasks.putIfAbsent(executionId, task) != null) {
                throw new Exception("TaskManager contains already a task with executionId " + executionId);
            }
            TaskInputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId, executionId);
            RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy);
            task.setEnvironment(env);
            this.channelManager.register(task);
            Configuration jobConfig = tdd.getJobConfiguration();
            boolean bl = enableProfiling = this.profiler != null && jobConfig.getBoolean("job.profiling.enable", true);
            if (enableProfiling) {
                task.registerProfiler(this.profiler, jobConfig);
            }
            HashMap<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
            for (Map.Entry e : DistributedCache.readFileInfoFromConfig((Configuration)tdd.getJobConfiguration())) {
                FutureTask<Path> cp = this.fileCache.createTmpFile((String)e.getKey(), (DistributedCache.DistributedCacheEntry)e.getValue(), jobID);
                cpTasks.put((String)e.getKey(), cp);
            }
            env.addCopyTasksForCacheFile(cpTasks);
            if (!task.startExecution()) {
                throw new CancelTaskException();
            }
            if (this.shutdownStarted.get()) {
                throw new Exception("Task Manager is shut down.");
            }
            return new TaskOperationResult(executionId, true);
        }
        catch (Throwable t) {
            String message;
            if (t instanceof CancelTaskException) {
                message = "Task was canceled";
            } else {
                LOG.error("Could not instantiate task", t);
                message = ExceptionUtils.stringifyException((Throwable)t);
            }
            try {
                this.runningTasks.remove(executionId);
                if (task != null) {
                    this.removeAllTaskResources(task);
                }
                if (jarsRegistered) {
                    this.libraryCacheManager.unregister(jobID);
                }
            }
            catch (Throwable t2) {
                LOG.error("Error during cleanup of task deployment", t2);
            }
            return new TaskOperationResult(executionId, false, message);
        }
    }

    private void unregisterTask(ExecutionAttemptID executionId) {
        Task task = this.runningTasks.remove(executionId);
        if (task == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Cannot find task with ID " + executionId + " to unregister");
            }
            return;
        }
        this.removeAllTaskResources(task);
        this.libraryCacheManager.unregister(task.getJobID());
    }

    private void removeAllTaskResources(Task task) {
        this.channelManager.unregister(task.getExecutionId(), task);
        task.unregisterProfiler(this.profiler);
        task.unregisterMemoryManager(this.memoryManager);
        try {
            RuntimeEnvironment re = task.getEnvironment();
            if (re != null) {
                for (Map.Entry e : DistributedCache.readFileInfoFromConfig((Configuration)task.getEnvironment().getJobConfiguration())) {
                    this.fileCache.deleteTmpFile((String)e.getKey(), (DistributedCache.DistributedCacheEntry)e.getValue(), task.getJobID());
                }
            }
        }
        catch (Throwable t) {
            LOG.error("Error cleaning up local files from the distributed cache.", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState newExecutionState, Throwable optionalError) {
        boolean success = false;
        try {
            success = this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, executionId, newExecutionState, optionalError));
        }
        catch (Throwable t) {
            String msg = "Error sending task state update to JobManager.";
            LOG.error(msg, t);
            ExceptionUtils.rethrow((Throwable)t, (String)msg);
        }
        finally {
            if (!success || newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED || newExecutionState == ExecutionState.FAILED) {
                this.unregisterTask(executionId);
            }
        }
    }

    public void cancelAndClearEverything(Throwable cause) {
        if (this.runningTasks.size() > 0) {
            LOG.info("Cancelling all computations and discarding all cached data.");
            for (Task t : this.runningTasks.values()) {
                t.failExternally(cause);
                this.runningTasks.remove(t.getExecutionId());
            }
        }
    }

    private void registerAndRunHeartbeatLoop(long interval, int maxNonSuccessfulHeatbeats) {
        block10: while (!this.shutdownStarted.get()) {
            InstanceID resultId = null;
            long maxDelay = 10000L;
            long reportingDelay = 5000L;
            long currentDelay = 100L;
            while (!this.shutdownStarted.get()) {
                block20: {
                    block19: {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Trying to register at Jobmanager...");
                        }
                        try {
                            resultId = this.jobManager.registerTaskManager(this.localInstanceConnectionInfo, this.hardwareDescription, this.numberOfSlots);
                            if (resultId == null) {
                                throw new Exception("Registration attempt refused by JobManager.");
                            }
                        }
                        catch (Exception e) {
                            if (currentDelay >= 5000L) {
                                LOG.error("Connection to JobManager failed.", (Throwable)e);
                            }
                            if (!LOG.isDebugEnabled()) break block19;
                            LOG.debug("Could not connect to JobManager.", (Throwable)e);
                        }
                    }
                    if (resultId != null) {
                        this.registeredId = resultId;
                        break;
                    }
                    try {
                        Thread.sleep(currentDelay);
                    }
                    catch (InterruptedException e) {
                        if (this.shutdownStarted.get()) break block20;
                        LOG.error("TaskManager's registration loop was interrupted without shutdown.");
                    }
                }
                currentDelay = Math.min(2L * currentDelay, 10000L);
            }
            int successiveUnsuccessfulHeartbeats = 0;
            while (!this.shutdownStarted.get()) {
                block22: {
                    block21: {
                        try {
                            Thread.sleep(interval);
                        }
                        catch (InterruptedException e) {
                            if (this.shutdownStarted.get()) break block21;
                            LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
                        }
                    }
                    try {
                        boolean accepted = this.jobManager.sendHeartbeat(resultId);
                        if (accepted) {
                            successiveUnsuccessfulHeartbeats = 0;
                        } else {
                            ++successiveUnsuccessfulHeartbeats;
                            LOG.error("JobManager rejected heart beat.");
                        }
                    }
                    catch (IOException e) {
                        if (this.shutdownStarted.get()) break block22;
                        ++successiveUnsuccessfulHeartbeats;
                        LOG.error("Sending the heart beat failed on I/O error: " + e.getMessage(), (Throwable)e);
                    }
                }
                if (successiveUnsuccessfulHeartbeats != maxNonSuccessfulHeatbeats) continue;
                LOG.error("TaskManager has lost connection to JobManager.");
                this.registeredId = null;
                this.cancelAndClearEverything(new Exception("TaskManager lost heartbeat connection to JobManager"));
                try {
                    Thread.sleep(10000L);
                }
                catch (InterruptedException e) {
                    if (this.shutdownStarted.get()) continue block10;
                    LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
                }
                continue block10;
            }
        }
    }

    private String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) {
        MemoryUsage heap = memoryMXBean.getHeapMemoryUsage();
        MemoryUsage nonHeap = memoryMXBean.getNonHeapMemoryUsage();
        int mb = 0x100000;
        int heapUsed = (int)(heap.getUsed() / (long)mb);
        int heapCommitted = (int)(heap.getCommitted() / (long)mb);
        int heapMax = (int)(heap.getMax() / (long)mb);
        int nonHeapUsed = (int)(nonHeap.getUsed() / (long)mb);
        int nonHeapCommitted = (int)(nonHeap.getCommitted() / (long)mb);
        int nonHeapMax = (int)(nonHeap.getMax() / (long)mb);
        String msg = String.format("Memory usage stats: [HEAP: %d/%d/%d MB, NON HEAP: %d/%d/%d MB (used/comitted/max)]", heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax);
        return msg;
    }

    private String getGarbageCollectorStatsAsString(List<GarbageCollectorMXBean> gcMXBeans) {
        StringBuilder str = new StringBuilder();
        str.append("Garbage collector stats: ");
        for (int i = 0; i < gcMXBeans.size(); ++i) {
            GarbageCollectorMXBean bean = gcMXBeans.get(i);
            String msg = String.format("[%s, GC TIME (ms): %d, GC COUNT: %d]", bean.getName(), bean.getCollectionTime(), bean.getCollectionCount());
            str.append(msg);
            str.append(i < gcMXBeans.size() - 1 ? ", " : "");
        }
        return str.toString();
    }

    public static TaskManager createTaskManager(ExecutionMode mode) throws Exception {
        InetSocketAddress jobManagerAddress;
        LOG.info("Reading location of job manager from configuration");
        String address = GlobalConfiguration.getString((String)"jobmanager.rpc.address", null);
        int port = GlobalConfiguration.getInteger((String)"jobmanager.rpc.port", (int)6123);
        if (address == null) {
            throw new Exception("Job manager address not configured in the GlobalConfiguration.");
        }
        try {
            InetAddress tmpAddress = InetAddress.getByName(address);
            jobManagerAddress = new InetSocketAddress(tmpAddress, port);
        }
        catch (UnknownHostException e) {
            LOG.error("Could not resolve JobManager host name.");
            throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
        }
        return TaskManager.createTaskManager(mode, jobManagerAddress);
    }

    public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress) throws Exception {
        InetAddress taskManagerAddress;
        try {
            taskManagerAddress = TaskManager.getTaskManagerAddress(jobManagerAddress);
        }
        catch (IOException e) {
            throw new Exception("The TaskManager failed to determine the IP address of the interface that connects to the JobManager.", e);
        }
        return TaskManager.createTaskManager(mode, jobManagerAddress, taskManagerAddress);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress, InetAddress taskManagerAddress) throws Exception {
        TaskManager taskManager;
        block11: {
            LOG.info("Connecting to JobManager at: " + jobManagerAddress);
            JobManagerProtocol jobManager = null;
            InputSplitProviderProtocol splitProvider = null;
            ChannelLookupProtocol channelLookup = null;
            AccumulatorProtocol accumulators = null;
            boolean success = false;
            try {
                try {
                    jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
                }
                catch (IOException e) {
                    LOG.error("Could not connect to the JobManager: " + e.getMessage(), (Throwable)e);
                    throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
                }
                try {
                    splitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
                }
                catch (IOException e) {
                    LOG.error(e.getMessage(), (Throwable)e);
                    throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e);
                }
                try {
                    channelLookup = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
                }
                catch (IOException e) {
                    LOG.error(e.getMessage(), (Throwable)e);
                    throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
                }
                try {
                    accumulators = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
                }
                catch (IOException e) {
                    LOG.error("Failed to initialize accumulator protocol: " + e.getMessage(), (Throwable)e);
                    throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
                }
                TaskManager tm = new TaskManager(mode, jobManager, splitProvider, channelLookup, accumulators, jobManagerAddress, taskManagerAddress);
                success = true;
                taskManager = tm;
                if (success) break block11;
            }
            catch (Throwable throwable) {
                if (!success) {
                    TaskManager.stopProxy(jobManager);
                    TaskManager.stopProxy(splitProvider);
                    TaskManager.stopProxy(channelLookup);
                    TaskManager.stopProxy(accumulators);
                }
                throw throwable;
            }
            TaskManager.stopProxy(jobManager);
            TaskManager.stopProxy(splitProvider);
            TaskManager.stopProxy(channelLookup);
            TaskManager.stopProxy(accumulators);
        }
        return taskManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws IOException {
        Object mon;
        OptionBuilder.withArgName((String)"config directory");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription((String)"Specify configuration directory.");
        Option configDirOpt = OptionBuilder.create((String)"configDir");
        OptionBuilder.withArgName((String)"temporary directory (overwrites configured option)");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription((String)"Specify temporary directory.");
        Option tempDir = OptionBuilder.create((String)ARG_CONF_DIR);
        configDirOpt.setRequired(true);
        tempDir.setRequired(false);
        Options options = new Options();
        options.addOption(configDirOpt);
        options.addOption(tempDir);
        GnuParser parser = new GnuParser();
        CommandLine line = null;
        try {
            line = parser.parse(options, args);
        }
        catch (ParseException e) {
            System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
            System.exit(1);
        }
        String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
        String tempDirVal = line.getOptionValue(tempDir.getOpt(), null);
        GlobalConfiguration.loadConfiguration((String)configDir);
        if (tempDirVal != null && GlobalConfiguration.getString((String)"taskmanager.tmp.dirs", null) == null) {
            Configuration c = GlobalConfiguration.getConfiguration();
            c.setString("taskmanager.tmp.dirs", tempDirVal);
            LOG.info("Setting temporary directory to " + tempDirVal);
            GlobalConfiguration.includeConfiguration((Configuration)c);
        }
        EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
        try {
            TaskManager.createTaskManager(ExecutionMode.CLUSTER);
        }
        catch (Throwable t) {
            LOG.error("Taskmanager startup failed: " + t.getMessage(), t);
            System.exit(1);
        }
        Object object = mon = new Object();
        synchronized (object) {
            try {
                mon.wait();
            }
            catch (InterruptedException ex) {
                // empty catch block
            }
        }
    }

    private static final void checkTempDirs(String[] tempDirs) throws Exception {
        for (int i = 0; i < tempDirs.length; ++i) {
            String dir = tempDirs[i];
            if (dir == null) {
                throw new Exception("Temporary file directory #" + (i + 1) + " is null.");
            }
            File f = new File(dir);
            if (!f.exists()) {
                throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' does not exist.");
            }
            if (!f.isDirectory()) {
                throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not a directory.");
            }
            if (f.canWrite()) continue;
            throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not writable.");
        }
    }

    private static final void stopProxy(VersionedProtocol protocol) {
        if (protocol != null) {
            try {
                RPC.stopProxy(protocol);
            }
            catch (Throwable t) {
                LOG.error("Error while shutting down RPC proxy.", t);
            }
        }
    }

    private static InetAddress getTaskManagerAddress(InetSocketAddress jobManagerAddress) throws IOException {
        AddressDetectionState strategy = AddressDetectionState.ADDRESS;
        while (true) {
            Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
            while (e.hasMoreElements()) {
                NetworkInterface n = e.nextElement();
                Enumeration<InetAddress> ee = n.getInetAddresses();
                block11: while (ee.hasMoreElements()) {
                    InetAddress i = ee.nextElement();
                    switch (strategy) {
                        case ADDRESS: {
                            if (!TaskManager.hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress()) || !TaskManager.tryToConnect(i, jobManagerAddress, strategy.getTimeout())) continue block11;
                            LOG.info("Determined " + i + " as the TaskTracker's own IP address");
                            return i;
                        }
                        case FAST_CONNECT: 
                        case SLOW_CONNECT: {
                            boolean correct = TaskManager.tryToConnect(i, jobManagerAddress, strategy.getTimeout());
                            if (!correct) continue block11;
                            LOG.info("Determined " + i + " as the TaskTracker's own IP address");
                            return i;
                        }
                    }
                    throw new RuntimeException("Unkown address detection strategy: " + (Object)((Object)strategy));
                }
            }
            switch (strategy) {
                case ADDRESS: {
                    strategy = AddressDetectionState.FAST_CONNECT;
                    break;
                }
                case FAST_CONNECT: {
                    strategy = AddressDetectionState.SLOW_CONNECT;
                    break;
                }
                case SLOW_CONNECT: {
                    throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '" + jobManagerAddress + "').");
                }
            }
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("Defaulting to detection strategy {}", (Object)strategy);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int getAvailablePort() {
        for (int i = 0; i < 50; ++i) {
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(0);
                int port = serverSocket.getLocalPort();
                if (port == 0) continue;
                int n = port;
                return n;
            }
            catch (IOException e) {
                LOG.debug("Unable to allocate port with exception {}", (Throwable)e);
                continue;
            }
            finally {
                if (serverSocket != null) {
                    try {
                        serverSocket.close();
                    }
                    catch (Throwable throwable) {}
                }
            }
        }
        throw new RuntimeException("Could not find a free permitted port on the machine.");
    }

    private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
        return address[0] == address2[0] && address[1] == address2[1];
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress + " with timeout " + timeout);
        }
        boolean connectable = true;
        Socket socket = null;
        try {
            socket = new Socket();
            InetSocketAddress bindP = new InetSocketAddress(fromAddress, 0);
            socket.bind(bindP);
            socket.connect(toSocket, timeout);
        }
        catch (Exception ex) {
            LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed with exception", (Throwable)ex);
            }
            connectable = false;
        }
        finally {
            if (socket != null) {
                socket.close();
            }
        }
        return connectable;
    }

    private static enum AddressDetectionState {
        ADDRESS(50),
        FAST_CONNECT(50),
        SLOW_CONNECT(1000);

        private int timeout;

        private AddressDetectionState(int timeout) {
            this.timeout = timeout;
        }

        public int getTimeout() {
            return this.timeout;
        }
    }
}

