/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.ResourceManagerAddress;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.EstablishedResourceManagerConnection;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.JobLeaderListener;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.JobManagerConnection;
import org.apache.flink.runtime.taskexecutor.JobManagerTable;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
import org.apache.flink.runtime.taskexecutor.rpc.RpcKvStateRegistryListener;
import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class TaskExecutor
extends RpcEndpoint
implements TaskExecutorGateway {
    public static final String TASK_MANAGER_NAME = "taskmanager";
    private final HighAvailabilityServices haServices;
    private final TaskManagerServices taskExecutorServices;
    private final TaskManagerConfiguration taskManagerConfiguration;
    private final HeartbeatManager<Void, AccumulatorReport> jobManagerHeartbeatManager;
    private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager;
    private final FatalErrorHandler fatalErrorHandler;
    private final BlobCacheService blobCacheService;
    private final TaskManagerLocation taskManagerLocation;
    private final TaskManagerMetricGroup taskManagerMetricGroup;
    private final TaskExecutorLocalStateStoresManager localStateStoresManager;
    private final NetworkEnvironment networkEnvironment;
    private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
    private final TaskSlotTable taskSlotTable;
    private final JobManagerTable jobManagerTable;
    private final JobLeaderService jobLeaderService;
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    private final HardwareDescription hardwareDescription;
    @Nullable
    private ResourceManagerAddress resourceManagerAddress;
    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;
    @Nullable
    private TaskExecutorToResourceManagerConnection resourceManagerConnection;
    @Nullable
    private UUID currentRegistrationTimeoutId;

    public TaskExecutor(RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, HighAvailabilityServices haServices, TaskManagerServices taskExecutorServices, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, BlobCacheService blobCacheService, FatalErrorHandler fatalErrorHandler) {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
        Preconditions.checkArgument((taskManagerConfiguration.getNumberSlots() > 0 ? 1 : 0) != 0, (Object)"The number of slots has to be larger than 0.");
        this.taskManagerConfiguration = (TaskManagerConfiguration)Preconditions.checkNotNull((Object)taskManagerConfiguration);
        this.taskExecutorServices = (TaskManagerServices)Preconditions.checkNotNull((Object)taskExecutorServices);
        this.haServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)haServices);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.taskManagerMetricGroup = (TaskManagerMetricGroup)Preconditions.checkNotNull((Object)taskManagerMetricGroup);
        this.blobCacheService = (BlobCacheService)Preconditions.checkNotNull((Object)blobCacheService);
        this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
        this.jobManagerTable = taskExecutorServices.getJobManagerTable();
        this.jobLeaderService = taskExecutorServices.getJobLeaderService();
        this.taskManagerLocation = taskExecutorServices.getTaskManagerLocation();
        this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore();
        this.networkEnvironment = taskExecutorServices.getNetworkEnvironment();
        this.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
        this.jobManagerConnections = new HashMap<ResourceID, JobManagerConnection>(4);
        ResourceID resourceId = taskExecutorServices.getTaskManagerLocation().getResourceID();
        this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(resourceId, new JobManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(resourceId, new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.hardwareDescription = HardwareDescription.extractFromSystem(taskExecutorServices.getMemoryManager().getMemorySize());
        this.resourceManagerAddress = null;
        this.resourceManagerConnection = null;
        this.currentRegistrationTimeoutId = null;
    }

    @Override
    public void start() throws Exception {
        super.start();
        try {
            this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
        }
        catch (Exception e) {
            this.onFatalError(e);
        }
        this.taskSlotTable.start(new SlotActionsImpl());
        this.jobLeaderService.start(this.getAddress(), this.getRpcService(), this.haServices, new JobLeaderListenerImpl());
        this.startRegistrationTimeout();
    }

    @Override
    public CompletableFuture<Void> postStop() {
        this.log.info("Stopping TaskExecutor {}.", (Object)this.getAddress());
        Throwable throwable = null;
        if (this.resourceManagerConnection != null) {
            this.resourceManagerConnection.close();
        }
        for (JobManagerConnection jobManagerConnection : this.jobManagerConnections.values()) {
            try {
                this.disassociateFromJobManager(jobManagerConnection, (Exception)((Object)new FlinkException("The TaskExecutor is shutting down.")));
            }
            catch (Throwable t) {
                throwable = ExceptionUtils.firstOrSuppressed((Throwable)t, throwable);
            }
        }
        this.jobManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
        try {
            this.resourceManagerLeaderRetriever.stop();
        }
        catch (Exception e) {
            throwable = ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)throwable);
        }
        try {
            this.taskExecutorServices.shutDown();
        }
        catch (Throwable t) {
            throwable = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)throwable);
        }
        this.taskManagerMetricGroup.close();
        if (throwable != null) {
            return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
        }
        this.log.info("Stopped TaskExecutor {}.", (Object)this.getAddress());
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(ExecutionAttemptID executionAttemptId, int sampleId, int numSamples, Time delayBetweenSamples, int maxStackTraceDepth, Time timeout) {
        return this.requestStackTraceSample(executionAttemptId, sampleId, numSamples, delayBetweenSamples, maxStackTraceDepth, new ArrayList<StackTraceElement[]>(numSamples), new CompletableFuture<StackTraceSampleResponse>());
    }

    private CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(ExecutionAttemptID executionAttemptId, int sampleId, int numSamples, Time delayBetweenSamples, int maxStackTraceDepth, List<StackTraceElement[]> currentTraces, CompletableFuture<StackTraceSampleResponse> resultFuture) {
        Optional<StackTraceElement[]> stackTrace = this.getStackTrace(executionAttemptId, maxStackTraceDepth);
        if (stackTrace.isPresent()) {
            currentTraces.add(stackTrace.get());
        } else if (!currentTraces.isEmpty()) {
            resultFuture.complete(new StackTraceSampleResponse(sampleId, executionAttemptId, currentTraces));
        } else {
            throw new IllegalStateException(String.format("Cannot sample task %s. Either the task is not known to the task manager or it is not running.", new Object[]{executionAttemptId}));
        }
        if (numSamples > 1) {
            this.scheduleRunAsync(() -> this.requestStackTraceSample(executionAttemptId, sampleId, numSamples - 1, delayBetweenSamples, maxStackTraceDepth, currentTraces, resultFuture), delayBetweenSamples.getSize(), delayBetweenSamples.getUnit());
            return resultFuture;
        }
        resultFuture.complete(new StackTraceSampleResponse(sampleId, executionAttemptId, currentTraces));
        return resultFuture;
    }

    private Optional<StackTraceElement[]> getStackTrace(ExecutionAttemptID executionAttemptId, int maxStackTraceDepth) {
        Task task = this.taskSlotTable.getTask(executionAttemptId);
        if (task != null && task.getExecutionState() == ExecutionState.RUNNING) {
            StackTraceElement[] stackTrace = task.getExecutingThread().getStackTrace();
            if (maxStackTraceDepth > 0) {
                return Optional.of(Arrays.copyOfRange(stackTrace, 0, Math.min(maxStackTraceDepth, stackTrace.length)));
            }
            return Optional.of(stackTrace);
        }
        return Optional.empty();
    }

    @Override
    public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
        try {
            boolean taskAdded;
            TaskInformation taskInformation;
            JobInformation jobInformation;
            JobID jobId = tdd.getJobId();
            JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobId);
            if (jobManagerConnection == null) {
                String message = "Could not submit task because there is no JobManager associated for the job " + jobId + '.';
                this.log.debug(message);
                throw new TaskSubmissionException(message);
            }
            if (!Objects.equals((Object)jobManagerConnection.getJobMasterId(), (Object)jobMasterId)) {
                String message = "Rejecting the task submission because the job manager leader id " + (Object)((Object)jobMasterId) + " does not match the expected job manager leader id " + (Object)((Object)jobManagerConnection.getJobMasterId()) + '.';
                this.log.debug(message);
                throw new TaskSubmissionException(message);
            }
            if (!this.taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
                String message = "No task slot allocated for job ID " + jobId + " and allocation ID " + (Object)((Object)tdd.getAllocationId()) + '.';
                this.log.debug(message);
                throw new TaskSubmissionException(message);
            }
            try {
                tdd.loadBigData(this.blobCacheService.getPermanentBlobService());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
            }
            try {
                jobInformation = (JobInformation)tdd.getSerializedJobInformation().deserializeValue(this.getClass().getClassLoader());
                taskInformation = (TaskInformation)tdd.getSerializedTaskInformation().deserializeValue(this.getClass().getClassLoader());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
            }
            if (!jobId.equals((Object)jobInformation.getJobId())) {
                throw new TaskSubmissionException("Inconsistent job ID information inside TaskDeploymentDescriptor (" + tdd.getJobId() + " vs. " + jobInformation.getJobId() + ")");
            }
            TaskMetricGroup taskMetricGroup = this.taskManagerMetricGroup.addTaskForJob(jobInformation.getJobId(), jobInformation.getJobName(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), taskInformation.getTaskName(), tdd.getSubtaskIndex(), tdd.getAttemptNumber());
            RpcInputSplitProvider inputSplitProvider = new RpcInputSplitProvider(jobManagerConnection.getJobManagerGateway(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), this.taskManagerConfiguration.getTimeout());
            TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
            CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
            LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
            ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
            PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
            TaskLocalStateStore localStateStore = this.localStateStoresManager.localStateStoreForSubtask(jobId, tdd.getAllocationId(), taskInformation.getJobVertexId(), tdd.getSubtaskIndex());
            JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
            TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(jobId, tdd.getExecutionAttemptId(), localStateStore, taskRestore, checkpointResponder);
            Task task = new Task(jobInformation, taskInformation, tdd.getExecutionAttemptId(), tdd.getAllocationId(), tdd.getSubtaskIndex(), tdd.getAttemptNumber(), tdd.getProducedPartitions(), tdd.getInputGates(), tdd.getTargetSlotNumber(), this.taskExecutorServices.getMemoryManager(), this.taskExecutorServices.getIOManager(), this.taskExecutorServices.getNetworkEnvironment(), this.taskExecutorServices.getBroadcastVariableManager(), taskStateManager, taskManagerActions, inputSplitProvider, checkpointResponder, this.blobCacheService, libraryCache, this.taskExecutorServices.getFileCache(), this.taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, this.getRpcService().getExecutor());
            this.log.info("Received task {}.", (Object)task.getTaskInfo().getTaskNameWithSubtasks());
            try {
                taskAdded = this.taskSlotTable.addTask(task);
            }
            catch (SlotNotActiveException | SlotNotFoundException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }
            if (taskAdded) {
                task.startTaskThread();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            String message = "TaskManager already contains a task for id " + (Object)((Object)task.getExecutionId()) + '.';
            this.log.debug(message);
            throw new TaskSubmissionException(message);
        }
        catch (TaskSubmissionException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override
    public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.cancelExecution();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Throwable t) {
                return FutureUtils.completedExceptionally(new TaskException("Cannot cancel task for execution " + (Object)((Object)executionAttemptID) + '.', t));
            }
        }
        String message = "Cannot find task to stop for execution " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new TaskException(message));
    }

    @Override
    public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.stopExecution();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Throwable t) {
                return FutureUtils.completedExceptionally(new TaskException("Cannot stop task for execution " + (Object)((Object)executionAttemptID) + '.', t));
            }
        }
        String message = "Cannot find task to stop for execution " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new TaskException(message));
    }

    @Override
    public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            for (PartitionInfo partitionInfo : partitionInfos) {
                IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
                SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID);
                if (singleInputGate != null) {
                    this.getRpcService().execute(() -> {
                        try {
                            singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
                        }
                        catch (IOException | InterruptedException e) {
                            this.log.error("Could not update input data location for task {}. Trying to fail task.", (Object)task.getTaskInfo().getTaskName(), (Object)e);
                            try {
                                task.failExternally(e);
                            }
                            catch (RuntimeException re) {
                                this.log.error("Failed canceling task with execution ID {} after task update failure.", (Object)executionAttemptID, (Object)re);
                            }
                        }
                    });
                    continue;
                }
                return FutureUtils.completedExceptionally(new PartitionException("No reader with ID " + (Object)((Object)intermediateResultPartitionID) + " for task " + (Object)((Object)executionAttemptID) + " was found."));
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        this.log.debug("Discard update for input partitions of task {}. Task is no longer running.", (Object)executionAttemptID);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public void failPartition(ExecutionAttemptID executionAttemptID) {
        this.log.info("Discarding the results produced by task execution {}.", (Object)executionAttemptID);
        try {
            this.networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
        }
        catch (Throwable t) {
            this.onFatalError(t);
        }
    }

    @Override
    public void heartbeatFromJobManager(ResourceID resourceID) {
        this.jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) {
        this.log.debug("Trigger checkpoint {}@{} for {}.", new Object[]{checkpointId, checkpointTimestamp, executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String message = "TaskManager received a checkpoint request for unknown task " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message));
    }

    @Override
    public CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
        this.log.debug("Confirm checkpoint {}@{} for {}.", new Object[]{checkpointId, checkpointTimestamp, executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.notifyCheckpointComplete(checkpointId);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String message = "TaskManager received a checkpoint confirmation for unknown task " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public CompletableFuture<Acknowledge> requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, String targetAddress, ResourceManagerId resourceManagerId, Time timeout) {
        this.log.info("Receive slot request {} for job {} from resource manager with leader id {}.", new Object[]{allocationId, jobId, resourceManagerId});
        try {
            if (!this.isConnectedToResourceManager(resourceManagerId)) {
                String message = String.format("TaskManager is not connected to the resource manager %s.", new Object[]{resourceManagerId});
                this.log.debug(message);
                throw new TaskManagerException(message);
            }
            if (this.taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                if (!this.taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, this.taskManagerConfiguration.getTimeout())) {
                    this.log.info("Could not allocate slot for {}.", (Object)allocationId);
                    throw new SlotAllocationException("Could not allocate slot.");
                }
                this.log.info("Allocated slot for {}.", (Object)allocationId);
            } else if (!this.taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
                String message = "The slot " + slotId + " has already been allocated for a different job.";
                this.log.info(message);
                AllocationID allocationID = this.taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
                throw new SlotOccupiedException(message, allocationID, this.taskSlotTable.getOwningJob(allocationID));
            }
            if (this.jobManagerTable.contains(jobId)) {
                this.offerSlotsToJobManager(jobId);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            try {
                this.jobLeaderService.addJob(jobId, targetAddress);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }
            catch (Exception e) {
                try {
                    this.taskSlotTable.freeSlot(allocationId);
                }
                catch (SlotNotFoundException slotNotFoundException) {
                    this.onFatalError(slotNotFoundException);
                }
                this.localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
                if (this.taskSlotTable.isSlotFree(slotId.getSlotNumber())) throw new SlotAllocationException("Could not add job to job leader service.", e);
                this.onFatalError(new Exception("Could not free slot " + slotId));
                throw new SlotAllocationException("Could not add job to job leader service.", e);
            }
        }
        catch (TaskManagerException taskManagerException) {
            return FutureUtils.completedExceptionally(taskManagerException);
        }
    }

    @Override
    public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
        this.freeSlotInternal(allocationId, cause);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time timeout) {
        String filePath;
        this.log.debug("Request file {} upload.", (Object)fileType);
        switch (fileType) {
            case LOG: {
                filePath = this.taskManagerConfiguration.getTaskManagerLogPath();
                break;
            }
            case STDOUT: {
                filePath = this.taskManagerConfiguration.getTaskManagerStdoutPath();
                break;
            }
            default: {
                filePath = null;
            }
        }
        if (filePath != null && !filePath.isEmpty()) {
            File file = new File(filePath);
            if (file.exists()) {
                TransientBlobKey transientBlobKey;
                TransientBlobCache transientBlobService = this.blobCacheService.getTransientBlobService();
                try (FileInputStream fileInputStream = new FileInputStream(file);){
                    transientBlobKey = transientBlobService.putTransient(fileInputStream);
                }
                catch (IOException e) {
                    this.log.debug("Could not upload file {}.", (Object)fileType, (Object)e);
                    return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + (Object)((Object)fileType) + '.', (Throwable)e));
                }
                return CompletableFuture.completedFuture(transientBlobKey);
            }
            this.log.debug("The file {} does not exist on the TaskExecutor {}.", (Object)fileType, (Object)this.getResourceID());
            return FutureUtils.completedExceptionally(new FlinkException("The file " + (Object)((Object)fileType) + " does not exist on the TaskExecutor."));
        }
        this.log.debug("The file {} is unavailable on the TaskExecutor {}.", (Object)fileType, (Object)this.getResourceID());
        return FutureUtils.completedExceptionally(new FlinkException("The file " + (Object)((Object)fileType) + " is not available on the TaskExecutor."));
    }

    @Override
    public void disconnectJobManager(JobID jobId, Exception cause) {
        this.closeJobManagerConnection(jobId, cause);
        this.jobLeaderService.reconnect(jobId);
    }

    @Override
    public void disconnectResourceManager(Exception cause) {
        this.reconnectToResourceManager(cause);
    }

    private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
        this.resourceManagerAddress = this.createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
        this.reconnectToResourceManager((Exception)((Object)new FlinkException(String.format("ResourceManager leader changed to new address %s", this.resourceManagerAddress))));
    }

    @Nullable
    private ResourceManagerAddress createResourceManagerAddress(@Nullable String newLeaderAddress, @Nullable ResourceManagerId newResourceManagerId) {
        if (newLeaderAddress == null) {
            return null;
        }
        assert (newResourceManagerId != null);
        return new ResourceManagerAddress(newLeaderAddress, newResourceManagerId);
    }

    private void reconnectToResourceManager(Exception cause) {
        this.closeResourceManagerConnection(cause);
        this.tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (this.resourceManagerAddress != null) {
            this.connectToResourceManager();
        }
    }

    private void connectToResourceManager() {
        assert (this.resourceManagerAddress != null);
        assert (this.establishedResourceManagerConnection == null);
        assert (this.resourceManagerConnection == null);
        this.log.info("Connecting to ResourceManager {}.", (Object)this.resourceManagerAddress);
        this.resourceManagerConnection = new TaskExecutorToResourceManagerConnection(this.log, this.getRpcService(), this.getAddress(), this.getResourceID(), this.taskManagerLocation.dataPort(), this.hardwareDescription, this.resourceManagerAddress.getAddress(), this.resourceManagerAddress.getResourceManagerId(), this.getMainThreadExecutor(), new ResourceManagerRegistrationListener());
        this.resourceManagerConnection.start();
    }

    private void establishResourceManagerConnection(final ResourceManagerGateway resourceManagerGateway, ResourceID resourceManagerResourceId, InstanceID taskExecutorRegistrationId, ClusterInformation clusterInformation) {
        CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(this.getResourceID(), taskExecutorRegistrationId, this.taskSlotTable.createSlotReport(this.getResourceID()), this.taskManagerConfiguration.getTimeout());
        slotReportResponseFuture.whenCompleteAsync((acknowledge, throwable) -> {
            if (throwable != null) {
                this.reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", (Throwable)throwable));
            }
        }, (Executor)this.getMainThreadExecutor());
        this.resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<SlotReport>(){

            @Override
            public void receiveHeartbeat(ResourceID resourceID, SlotReport slotReport) {
                resourceManagerGateway.heartbeatFromTaskManager(resourceID, slotReport);
            }

            @Override
            public void requestHeartbeat(ResourceID resourceID, SlotReport slotReport) {
            }
        });
        InetSocketAddress blobServerAddress = new InetSocketAddress(clusterInformation.getBlobServerHostname(), clusterInformation.getBlobServerPort());
        this.blobCacheService.setBlobServerAddress(blobServerAddress);
        this.establishedResourceManagerConnection = new EstablishedResourceManagerConnection(resourceManagerGateway, resourceManagerResourceId, taskExecutorRegistrationId);
        this.stopRegistrationTimeout();
    }

    private void closeResourceManagerConnection(Exception cause) {
        if (this.establishedResourceManagerConnection != null) {
            ResourceID resourceManagerResourceId = this.establishedResourceManagerConnection.getResourceManagerResourceId();
            if (this.log.isDebugEnabled()) {
                this.log.debug("Close ResourceManager connection {}.", (Object)resourceManagerResourceId, (Object)cause);
            } else {
                this.log.info("Close ResourceManager connection {}.", (Object)resourceManagerResourceId);
            }
            this.resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId);
            ResourceManagerGateway resourceManagerGateway = this.establishedResourceManagerConnection.getResourceManagerGateway();
            resourceManagerGateway.disconnectTaskManager(this.getResourceID(), cause);
            this.establishedResourceManagerConnection = null;
        }
        if (this.resourceManagerConnection != null) {
            if (!this.resourceManagerConnection.isConnected()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Terminating registration attempts towards ResourceManager {}.", (Object)this.resourceManagerConnection.getTargetAddress(), (Object)cause);
                } else {
                    this.log.info("Terminating registration attempts towards ResourceManager {}.", (Object)this.resourceManagerConnection.getTargetAddress());
                }
            }
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
        this.startRegistrationTimeout();
    }

    private void startRegistrationTimeout() {
        Time maxRegistrationDuration = this.taskManagerConfiguration.getMaxRegistrationDuration();
        if (maxRegistrationDuration != null) {
            UUID newRegistrationTimeoutId;
            this.currentRegistrationTimeoutId = newRegistrationTimeoutId = UUID.randomUUID();
            this.scheduleRunAsync(() -> this.registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
        }
    }

    private void stopRegistrationTimeout() {
        this.currentRegistrationTimeoutId = null;
    }

    private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
        if (registrationTimeoutId.equals(this.currentRegistrationTimeoutId)) {
            Time maxRegistrationDuration = this.taskManagerConfiguration.getMaxRegistrationDuration();
            this.onFatalError(new RegistrationTimeoutException(String.format("Could not register at the ResourceManager within the specified maximum registration duration %s. This indicates a problem with this instance. Terminating now.", maxRegistrationDuration)));
        }
    }

    private void offerSlotsToJobManager(JobID jobId) {
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobId);
        if (jobManagerConnection == null) {
            this.log.debug("There is no job manager connection to the leader of job {}.", (Object)jobId);
        } else if (this.taskSlotTable.hasAllocatedSlots(jobId)) {
            this.log.info("Offer reserved slots to the leader of job {}.", (Object)jobId);
            JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
            Iterator<TaskSlot> reservedSlotsIterator = this.taskSlotTable.getAllocatedSlots(jobId);
            JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
            HashSet<SlotOffer> reservedSlots = new HashSet<SlotOffer>(2);
            while (reservedSlotsIterator.hasNext()) {
                SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
                reservedSlots.add(offer);
            }
            CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(this.getResourceID(), reservedSlots, this.taskManagerConfiguration.getTimeout());
            acceptedSlotsFuture.whenCompleteAsync((acceptedSlots, throwable) -> {
                if (throwable != null) {
                    if (throwable instanceof TimeoutException) {
                        this.log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
                        this.offerSlotsToJobManager(jobId);
                    } else {
                        this.log.warn("Slot offering to JobManager failed. Freeing the slots and returning them to the ResourceManager.", throwable);
                        for (SlotOffer reservedSlot : reservedSlots) {
                            this.freeSlotInternal(reservedSlot.getAllocationId(), (Throwable)throwable);
                        }
                    }
                } else if (this.isJobManagerConnectionValid(jobId, jobMasterId)) {
                    for (SlotOffer acceptedSlot : acceptedSlots) {
                        try {
                            if (!this.taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
                                String message = "Could not mark slot " + jobId + " active.";
                                this.log.debug(message);
                                jobMasterGateway.failSlot(this.getResourceID(), acceptedSlot.getAllocationId(), (Exception)((Object)new FlinkException(message)));
                            }
                        }
                        catch (SlotNotFoundException e) {
                            String message = "Could not mark slot " + jobId + " active.";
                            jobMasterGateway.failSlot(this.getResourceID(), acceptedSlot.getAllocationId(), (Exception)((Object)new FlinkException(message)));
                        }
                        reservedSlots.remove(acceptedSlot);
                    }
                    Exception e = new Exception("The slot was rejected by the JobManager.");
                    for (SlotOffer rejectedSlot : reservedSlots) {
                        this.freeSlotInternal(rejectedSlot.getAllocationId(), e);
                    }
                } else {
                    this.log.debug("Discard offer slot response since there is a new leader for the job {}.", (Object)jobId);
                }
            }, (Executor)this.getMainThreadExecutor());
        } else {
            this.log.debug("There are no unassigned slots for the job {}.", (Object)jobId);
        }
    }

    private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) {
        if (this.jobManagerTable.contains(jobId)) {
            JobManagerConnection oldJobManagerConnection = this.jobManagerTable.get(jobId);
            if (Objects.equals((Object)oldJobManagerConnection.getJobMasterId(), jobMasterGateway.getFencingToken())) {
                this.log.debug("Ignore JobManager gained leadership message for {} because we are already connected to it.", jobMasterGateway.getFencingToken());
                return;
            }
            this.closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId + '.'));
        }
        this.log.info("Establish JobManager connection for job {}.", (Object)jobId);
        ResourceID jobManagerResourceID = registrationSuccess.getResourceID();
        JobManagerConnection newJobManagerConnection = this.associateWithJobManager(jobId, jobManagerResourceID, jobMasterGateway);
        this.jobManagerConnections.put(jobManagerResourceID, newJobManagerConnection);
        this.jobManagerTable.put(jobId, newJobManagerConnection);
        this.jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>(){

            @Override
            public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
                jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
            }

            @Override
            public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
            }
        });
        this.offerSlotsToJobManager(jobId);
    }

    private void closeJobManagerConnection(JobID jobId, Exception cause) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Close JobManager connection for job {}.", (Object)jobId, (Object)cause);
        } else {
            this.log.info("Close JobManager connection for job {}.", (Object)jobId);
        }
        Iterator<Task> tasks = this.taskSlotTable.getTasks(jobId);
        FlinkException failureCause = new FlinkException("JobManager responsible for " + jobId + " lost the leadership.", (Throwable)cause);
        while (tasks.hasNext()) {
            tasks.next().failExternally(failureCause);
        }
        Iterator<AllocationID> activeSlots = this.taskSlotTable.getActiveSlots(jobId);
        FlinkException freeingCause = new FlinkException("Slot could not be marked inactive.");
        while (activeSlots.hasNext()) {
            AllocationID activeSlot = activeSlots.next();
            try {
                if (this.taskSlotTable.markSlotInactive(activeSlot, this.taskManagerConfiguration.getTimeout())) continue;
                this.freeSlotInternal(activeSlot, freeingCause);
            }
            catch (SlotNotFoundException e) {
                this.log.debug("Could not mark the slot {} inactive.", (Object)jobId, (Object)e);
            }
        }
        JobManagerConnection jobManagerConnection = this.jobManagerTable.remove(jobId);
        if (jobManagerConnection != null) {
            try {
                this.jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceID());
                this.jobManagerConnections.remove(jobManagerConnection.getResourceID());
                this.disassociateFromJobManager(jobManagerConnection, cause);
            }
            catch (IOException e) {
                this.log.warn("Could not properly disassociate from JobManager {}.", (Object)jobManagerConnection.getJobManagerGateway().getAddress(), (Object)e);
            }
        }
    }

    private JobManagerConnection associateWithJobManager(JobID jobID, ResourceID resourceID, JobMasterGateway jobMasterGateway) {
        Preconditions.checkNotNull((Object)jobID);
        Preconditions.checkNotNull((Object)resourceID);
        Preconditions.checkNotNull((Object)jobMasterGateway);
        TaskManagerActionsImpl taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
        RpcCheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
        BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(this.blobCacheService.getPermanentBlobService(), this.taskManagerConfiguration.getClassLoaderResolveOrder(), this.taskManagerConfiguration.getAlwaysParentFirstLoaderPatterns());
        RpcResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(jobMasterGateway, this.getRpcService().getExecutor(), this.taskManagerConfiguration.getTimeout());
        RpcPartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
        this.registerQueryableState(jobID, jobMasterGateway);
        return new JobManagerConnection(jobID, resourceID, jobMasterGateway, taskManagerActions, checkpointResponder, libraryCacheManager, resultPartitionConsumableNotifier, partitionStateChecker);
    }

    private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException {
        KvStateClientProxy kvStateClientProxy;
        Preconditions.checkNotNull((Object)jobManagerConnection);
        KvStateRegistry kvStateRegistry = this.networkEnvironment.getKvStateRegistry();
        if (kvStateRegistry != null) {
            kvStateRegistry.unregisterListener(jobManagerConnection.getJobID());
        }
        if ((kvStateClientProxy = this.networkEnvironment.getKvStateProxy()) != null) {
            kvStateClientProxy.updateKvStateLocationOracle(jobManagerConnection.getJobID(), null);
        }
        JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
        jobManagerGateway.disconnectTaskManager(this.getResourceID(), cause);
        jobManagerConnection.getLibraryCacheManager().shutdown();
    }

    private void registerQueryableState(JobID jobId, JobMasterGateway jobMasterGateway) {
        KvStateClientProxy kvStateProxy;
        KvStateServer kvStateServer = this.networkEnvironment.getKvStateServer();
        KvStateRegistry kvStateRegistry = this.networkEnvironment.getKvStateRegistry();
        if (kvStateServer != null && kvStateRegistry != null) {
            kvStateRegistry.registerListener(jobId, new RpcKvStateRegistryListener(jobMasterGateway, kvStateServer.getServerAddress()));
        }
        if ((kvStateProxy = this.networkEnvironment.getKvStateProxy()) != null) {
            kvStateProxy.updateKvStateLocationOracle(jobId, jobMasterGateway);
        }
    }

    private void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.failExternally(cause);
            }
            catch (Throwable t) {
                this.log.error("Could not fail task {}.", (Object)executionAttemptID, (Object)t);
            }
        } else {
            this.log.debug("Cannot find task to fail for execution {}.", (Object)executionAttemptID);
        }
    }

    private void updateTaskExecutionState(JobMasterGateway jobMasterGateway, TaskExecutionState taskExecutionState) {
        ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
        CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
        futureAcknowledge.whenCompleteAsync((ack, throwable) -> {
            if (throwable != null) {
                this.failTask(executionAttemptID, (Throwable)throwable);
            }
        }, (Executor)this.getMainThreadExecutor());
    }

    private void unregisterTaskAndNotifyFinalState(JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
        Task task = this.taskSlotTable.removeTask(executionAttemptID);
        if (task != null) {
            if (!task.getExecutionState().isTerminal()) {
                try {
                    task.failExternally(new IllegalStateException("Task is being remove from TaskManager."));
                }
                catch (Exception e) {
                    this.log.error("Could not properly fail task.", (Throwable)e);
                }
            }
            this.log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.", new Object[]{task.getExecutionState(), task.getTaskInfo().getTaskName(), task.getExecutionId()});
            AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot();
            this.updateTaskExecutionState(jobMasterGateway, new TaskExecutionState(task.getJobID(), task.getExecutionId(), task.getExecutionState(), task.getFailureCause(), accumulatorSnapshot, task.getMetricGroup().getIOMetricGroup().createSnapshot()));
        } else {
            this.log.error("Cannot find task with ID {} to unregister.", (Object)executionAttemptID);
        }
    }

    private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
        block5: {
            Preconditions.checkNotNull((Object)((Object)allocationId));
            this.log.debug("Free slot with allocation id {} because: {}", (Object)allocationId, (Object)cause.getMessage());
            try {
                JobID jobId = this.taskSlotTable.getOwningJob(allocationId);
                int slotIndex = this.taskSlotTable.freeSlot(allocationId, cause);
                if (slotIndex == -1) break block5;
                if (this.isConnectedToResourceManager()) {
                    ResourceManagerGateway resourceManagerGateway = this.establishedResourceManagerConnection.getResourceManagerGateway();
                    resourceManagerGateway.notifySlotAvailable(this.establishedResourceManagerConnection.getTaskExecutorRegistrationId(), new SlotID(this.getResourceID(), slotIndex), allocationId);
                }
                if (jobId == null || !this.taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()) break block5;
                try {
                    this.jobLeaderService.removeJob(jobId);
                }
                catch (Exception e) {
                    this.log.info("Could not remove job {} from JobLeaderService.", (Object)jobId, (Object)e);
                }
                this.closeJobManagerConnection(jobId, (Exception)((Object)new FlinkException("TaskExecutor " + this.getAddress() + " has no more allocated slots for job " + jobId + '.')));
            }
            catch (SlotNotFoundException e) {
                this.log.debug("Could not free slot for allocation id {}.", (Object)allocationId, (Object)e);
            }
        }
        this.localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
    }

    private void timeoutSlot(AllocationID allocationId, UUID ticket) {
        Preconditions.checkNotNull((Object)((Object)allocationId));
        Preconditions.checkNotNull((Object)ticket);
        if (this.taskSlotTable.isValidTimeout(allocationId, ticket)) {
            this.freeSlotInternal(allocationId, new Exception("The slot " + (Object)((Object)allocationId) + " has timed out."));
        } else {
            this.log.debug("Received an invalid timeout for allocation id {} with ticket {}.", (Object)allocationId, (Object)ticket);
        }
    }

    private boolean isConnectedToResourceManager() {
        return this.establishedResourceManagerConnection != null;
    }

    private boolean isConnectedToResourceManager(ResourceManagerId resourceManagerId) {
        return this.establishedResourceManagerConnection != null && this.resourceManagerAddress != null && this.resourceManagerAddress.getResourceManagerId().equals((Object)resourceManagerId);
    }

    private boolean isJobManagerConnectionValid(JobID jobId, JobMasterId jobMasterId) {
        JobManagerConnection jmConnection = this.jobManagerTable.get(jobId);
        return jmConnection != null && Objects.equals((Object)jmConnection.getJobMasterId(), (Object)jobMasterId);
    }

    public ResourceID getResourceID() {
        return this.taskManagerLocation.getResourceID();
    }

    void onFatalError(Throwable t) {
        try {
            this.log.error("Fatal error occurred in TaskExecutor {}.", (Object)this.getAddress(), (Object)t);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.fatalErrorHandler.onFatalError(t);
    }

    @VisibleForTesting
    TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
        return this.resourceManagerConnection;
    }

    @VisibleForTesting
    HeartbeatManager<Void, SlotReport> getResourceManagerHeartbeatManager() {
        return this.resourceManagerHeartbeatManager;
    }

    private class ResourceManagerHeartbeatListener
    implements HeartbeatListener<Void, SlotReport> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceId) {
            TaskExecutor.this.runAsync(() -> {
                if (TaskExecutor.this.establishedResourceManagerConnection != null && TaskExecutor.this.establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {
                    TaskExecutor.this.log.info("The heartbeat of ResourceManager with id {} timed out.", (Object)resourceId);
                    TaskExecutor.this.reconnectToResourceManager(new TaskManagerException(String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
                } else {
                    TaskExecutor.this.log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", (Object)resourceId);
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public CompletableFuture<SlotReport> retrievePayload(ResourceID resourceID) {
            return TaskExecutor.this.callAsync(() -> TaskExecutor.this.taskSlotTable.createSlotReport(TaskExecutor.this.getResourceID()), TaskExecutor.this.taskManagerConfiguration.getTimeout());
        }
    }

    private class JobManagerHeartbeatListener
    implements HeartbeatListener<Void, AccumulatorReport> {
        private JobManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            TaskExecutor.this.runAsync(() -> {
                JobManagerConnection jobManagerConnection;
                TaskExecutor.this.log.info("The heartbeat of JobManager with id {} timed out.", (Object)resourceID);
                if (TaskExecutor.this.jobManagerConnections.containsKey(resourceID) && (jobManagerConnection = (JobManagerConnection)TaskExecutor.this.jobManagerConnections.get(resourceID)) != null) {
                    TaskExecutor.this.closeJobManagerConnection(jobManagerConnection.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
                    TaskExecutor.this.jobLeaderService.reconnect(jobManagerConnection.getJobID());
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public CompletableFuture<AccumulatorReport> retrievePayload(ResourceID resourceID) {
            TaskExecutor.this.validateRunsInMainThread();
            JobManagerConnection jobManagerConnection = (JobManagerConnection)TaskExecutor.this.jobManagerConnections.get(resourceID);
            if (jobManagerConnection != null) {
                JobID jobId = jobManagerConnection.getJobID();
                ArrayList<AccumulatorSnapshot> accumulatorSnapshots = new ArrayList<AccumulatorSnapshot>(16);
                Iterator<Task> allTasks = TaskExecutor.this.taskSlotTable.getTasks(jobId);
                while (allTasks.hasNext()) {
                    Task task = allTasks.next();
                    accumulatorSnapshots.add(task.getAccumulatorRegistry().getSnapshot());
                }
                return CompletableFuture.completedFuture(new AccumulatorReport(accumulatorSnapshots));
            }
            return CompletableFuture.completedFuture(new AccumulatorReport(Collections.emptyList()));
        }
    }

    private class SlotActionsImpl
    implements SlotActions {
        private SlotActionsImpl() {
        }

        @Override
        public void freeSlot(AllocationID allocationId) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.freeSlotInternal(allocationId, new FlinkException("TaskSlotTable requested freeing the TaskSlot " + (Object)((Object)allocationId) + '.')));
        }

        @Override
        public void timeoutSlot(AllocationID allocationId, UUID ticket) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.timeoutSlot(allocationId, ticket));
        }
    }

    private final class TaskManagerActionsImpl
    implements TaskManagerActions {
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = (JobMasterGateway)Preconditions.checkNotNull((Object)jobMasterGateway);
        }

        @Override
        public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.unregisterTaskAndNotifyFinalState(this.jobMasterGateway, executionAttemptID));
        }

        @Override
        public void notifyFatalError(String message, Throwable cause) {
            try {
                TaskExecutor.this.log.error(message, cause);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            TaskExecutor.this.fatalErrorHandler.onFatalError(cause);
        }

        @Override
        public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.failTask(executionAttemptID, cause));
        }

        @Override
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            TaskExecutor.this.updateTaskExecutionState(this.jobMasterGateway, taskExecutionState);
        }
    }

    private final class ResourceManagerRegistrationListener
    implements RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> {
        private ResourceManagerRegistrationListener() {
        }

        @Override
        public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
            ResourceID resourceManagerId = success.getResourceManagerId();
            InstanceID taskExecutorRegistrationId = success.getRegistrationId();
            ClusterInformation clusterInformation = success.getClusterInformation();
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)connection.getTargetGateway();
            TaskExecutor.this.runAsync(() -> {
                if (TaskExecutor.this.resourceManagerConnection == connection) {
                    TaskExecutor.this.establishResourceManagerConnection(resourceManagerGateway, resourceManagerId, taskExecutorRegistrationId, clusterInformation);
                }
            });
        }

        @Override
        public void onRegistrationFailure(Throwable failure) {
            TaskExecutor.this.onFatalError(failure);
        }
    }

    private final class JobLeaderListenerImpl
    implements JobLeaderListener {
        private JobLeaderListenerImpl() {
        }

        @Override
        public void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess registrationMessage) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.establishJobManagerConnection(jobId, jobManagerGateway, registrationMessage));
        }

        @Override
        public void jobManagerLostLeadership(JobID jobId, JobMasterId jobMasterId) {
            TaskExecutor.this.log.info("JobManager for job {} with leader id {} lost leadership.", (Object)jobId, (Object)jobMasterId);
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.closeJobManagerConnection(jobId, new Exception("Job leader for job id " + jobId + " lost leadership.")));
        }

        @Override
        public void handleError(Throwable throwable) {
            TaskExecutor.this.onFatalError(throwable);
        }
    }

    private final class ResourceManagerLeaderListener
    implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override
        public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.notifyOfNewResourceManagerLeader(leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID)));
        }

        @Override
        public void handleError(Exception exception) {
            TaskExecutor.this.onFatalError(exception);
        }
    }
}

