/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.JobStatusProvider;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.ExecutionGraphToInputsLocationsRetrieverAdapter;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.InputsLocationsRetriever;
import org.apache.flink.runtime.scheduler.KvStateHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerUtils;
import org.apache.flink.runtime.scheduler.StateLocationRetriever;
import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics;
import org.apache.flink.runtime.scheduler.metrics.JobStatusMetrics;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.runtime.util.BoundedFIFOQueue;
import org.apache.flink.runtime.util.IntArrayList;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

public abstract class SchedulerBase
implements SchedulerNG,
CheckpointScheduling {
    private final Logger log;
    private final JobGraph jobGraph;
    private final ExecutionGraph executionGraph;
    private final SchedulingTopology schedulingTopology;
    protected final StateLocationRetriever stateLocationRetriever;
    protected final InputsLocationsRetriever inputsLocationsRetriever;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final CheckpointsCleaner checkpointsCleaner;
    private final CheckpointIDCounter checkpointIdCounter;
    protected final JobManagerJobMetricGroup jobManagerJobMetricGroup;
    protected final ExecutionVertexVersioner executionVertexVersioner;
    private final KvStateHandler kvStateHandler;
    private final ExecutionGraphHandler executionGraphHandler;
    protected final OperatorCoordinatorHandler operatorCoordinatorHandler;
    private final ComponentMainThreadExecutor mainThreadExecutor;
    private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;
    private final ExecutionGraphFactory executionGraphFactory;
    private final MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings;
    private final DeploymentStateTimeMetrics deploymentStateTimeMetrics;

    public SchedulerBase(Logger log, JobGraph jobGraph, Executor ioExecutor, Configuration jobMasterConfiguration, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, ExecutionVertexVersioner executionVertexVersioner, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, VertexParallelismStore vertexParallelismStore) throws Exception {
        this.log = (Logger)Preconditions.checkNotNull((Object)log);
        this.jobGraph = (JobGraph)Preconditions.checkNotNull((Object)jobGraph);
        this.executionGraphFactory = executionGraphFactory;
        this.jobManagerJobMetricGroup = (JobManagerJobMetricGroup)Preconditions.checkNotNull((Object)jobManagerJobMetricGroup);
        this.executionVertexVersioner = (ExecutionVertexVersioner)Preconditions.checkNotNull((Object)executionVertexVersioner);
        this.mainThreadExecutor = mainThreadExecutor;
        this.checkpointsCleaner = checkpointsCleaner;
        this.completedCheckpointStore = SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(jobGraph, jobMasterConfiguration, (CheckpointRecoveryFactory)Preconditions.checkNotNull((Object)checkpointRecoveryFactory), ioExecutor, log);
        this.checkpointIdCounter = SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(jobGraph, (CheckpointRecoveryFactory)Preconditions.checkNotNull((Object)checkpointRecoveryFactory));
        this.jobStatusMetricsSettings = MetricOptions.JobStatusMetricsSettings.fromConfiguration((Configuration)jobMasterConfiguration);
        this.deploymentStateTimeMetrics = new DeploymentStateTimeMetrics(jobGraph.getJobType(), this.jobStatusMetricsSettings);
        this.executionGraph = this.createAndRestoreExecutionGraph(this.completedCheckpointStore, checkpointsCleaner, this.checkpointIdCounter, initializationTimestamp, mainThreadExecutor, jobStatusListener, vertexParallelismStore);
        this.schedulingTopology = this.executionGraph.getSchedulingTopology();
        this.stateLocationRetriever = executionVertexId -> this.getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();
        this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(this.executionGraph);
        this.kvStateHandler = new KvStateHandler(this.executionGraph);
        this.executionGraphHandler = new ExecutionGraphHandler(this.executionGraph, log, ioExecutor, this.mainThreadExecutor);
        this.operatorCoordinatorHandler = new DefaultOperatorCoordinatorHandler(this.executionGraph, this::handleGlobalFailure);
        this.operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor);
        this.exceptionHistory = new BoundedFIFOQueue(jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
    }

    private void shutDownCheckpointServices(JobStatus jobStatus) {
        Exception exception = null;
        try {
            this.completedCheckpointStore.shutdown(jobStatus, this.checkpointsCleaner);
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            this.checkpointIdCounter.shutdown(jobStatus).get();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        if (exception != null) {
            this.log.error("Error while shutting down checkpoint services.", (Throwable)exception);
        }
    }

    private static int normalizeParallelism(int parallelism) {
        if (parallelism == -1) {
            return 1;
        }
        return parallelism;
    }

    public static int getDefaultMaxParallelism(JobVertex vertex) {
        return KeyGroupRangeAssignment.computeDefaultMaxParallelism(SchedulerBase.normalizeParallelism(vertex.getParallelism()));
    }

    public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices, Function<JobVertex, Integer> defaultMaxParallelismFunc) {
        return SchedulerBase.computeVertexParallelismStore(vertices, defaultMaxParallelismFunc, SchedulerBase::normalizeParallelism);
    }

    public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices, Function<JobVertex, Integer> defaultMaxParallelismFunc, Function<Integer, Integer> normalizeParallelismFunc) {
        DefaultVertexParallelismStore store = new DefaultVertexParallelismStore();
        for (JobVertex vertex : vertices) {
            boolean autoConfigured;
            int parallelism = normalizeParallelismFunc.apply(vertex.getParallelism());
            int maxParallelism = vertex.getMaxParallelism();
            if (maxParallelism == -1) {
                maxParallelism = defaultMaxParallelismFunc.apply(vertex);
                autoConfigured = true;
            } else {
                autoConfigured = false;
            }
            DefaultVertexParallelismInfo parallelismInfo = new DefaultVertexParallelismInfo(parallelism, maxParallelism, newMax -> autoConfigured ? Optional.empty() : Optional.of("Cannot override a configured max parallelism."));
            store.setParallelismInfo(vertex.getID(), parallelismInfo);
        }
        return store;
    }

    public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> vertices) {
        return SchedulerBase.computeVertexParallelismStore(vertices, SchedulerBase::getDefaultMaxParallelism);
    }

    public static VertexParallelismStore computeVertexParallelismStore(JobGraph jobGraph) {
        return SchedulerBase.computeVertexParallelismStore(jobGraph.getVertices());
    }

    private ExecutionGraph createAndRestoreExecutionGraph(CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, VertexParallelismStore vertexParallelismStore) throws Exception {
        ExecutionGraph newExecutionGraph = this.executionGraphFactory.createAndRestoreExecutionGraph(this.jobGraph, completedCheckpointStore, checkpointsCleaner, checkpointIdCounter, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(this.jobGraph.getJobType()), initializationTimestamp, new DefaultVertexAttemptNumberStore(), vertexParallelismStore, this.deploymentStateTimeMetrics, this.log);
        newExecutionGraph.setInternalTaskFailuresListener(new UpdateSchedulerNgOnInternalFailuresListener(this));
        newExecutionGraph.registerJobStatusListener(jobStatusListener);
        newExecutionGraph.start(mainThreadExecutor);
        return newExecutionGraph;
    }

    protected void resetForNewExecutions(Collection<ExecutionVertexID> vertices) {
        vertices.stream().forEach(this::resetForNewExecution);
    }

    protected void resetForNewExecution(ExecutionVertexID executionVertexId) {
        this.getExecutionVertex(executionVertexId).resetForNewExecution();
    }

    protected void restoreState(Set<ExecutionVertexID> vertices, boolean isGlobalRecovery) throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            if (isGlobalRecovery) {
                this.notifyCoordinatorsOfEmptyGlobalRestore();
            } else {
                this.notifyCoordinatorsOfSubtaskRestore(this.getInvolvedExecutionJobVerticesAndSubtasks(vertices), -1L);
            }
            return;
        }
        checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
        if (isGlobalRecovery) {
            Set<ExecutionJobVertex> jobVerticesToRestore = this.getInvolvedExecutionJobVertices(vertices);
            checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true);
        } else {
            Map<ExecutionJobVertex, IntArrayList> subtasksToRestore = this.getInvolvedExecutionJobVerticesAndSubtasks(vertices);
            OptionalLong restoredCheckpointId = checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(subtasksToRestore.keySet());
            long checkpointId = restoredCheckpointId.orElse(-1L);
            this.notifyCoordinatorsOfSubtaskRestore(subtasksToRestore, checkpointId);
        }
    }

    private void notifyCoordinatorsOfSubtaskRestore(Map<ExecutionJobVertex, IntArrayList> restoredSubtasks, long checkpointId) {
        for (Map.Entry<ExecutionJobVertex, IntArrayList> vertexSubtasks : restoredSubtasks.entrySet()) {
            ExecutionJobVertex jobVertex = vertexSubtasks.getKey();
            IntArrayList subtasks = vertexSubtasks.getValue();
            Collection<OperatorCoordinatorHolder> coordinators = jobVertex.getOperatorCoordinators();
            if (coordinators.isEmpty()) continue;
            while (!subtasks.isEmpty()) {
                int subtask = subtasks.removeLast();
                for (OperatorCoordinatorHolder opCoordinator : coordinators) {
                    opCoordinator.subtaskReset(subtask, checkpointId);
                }
            }
        }
    }

    private void notifyCoordinatorsOfEmptyGlobalRestore() throws Exception {
        for (ExecutionJobVertex ejv : this.getExecutionGraph().getAllVertices().values()) {
            if (!ejv.isInitialized()) continue;
            for (OperatorCoordinatorHolder coordinator : ejv.getOperatorCoordinators()) {
                coordinator.resetToCheckpoint(-1L, null);
            }
        }
    }

    private Set<ExecutionJobVertex> getInvolvedExecutionJobVertices(Set<ExecutionVertexID> executionVertices) {
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        for (ExecutionVertexID executionVertexID : executionVertices) {
            ExecutionVertex executionVertex = this.getExecutionVertex(executionVertexID);
            tasks.add(executionVertex.getJobVertex());
        }
        return tasks;
    }

    private Map<ExecutionJobVertex, IntArrayList> getInvolvedExecutionJobVerticesAndSubtasks(Set<ExecutionVertexID> executionVertices) {
        HashMap<ExecutionJobVertex, IntArrayList> result = new HashMap<ExecutionJobVertex, IntArrayList>();
        for (ExecutionVertexID executionVertexID : executionVertices) {
            ExecutionVertex executionVertex = this.getExecutionVertex(executionVertexID);
            IntArrayList subtasks = result.computeIfAbsent(executionVertex.getJobVertex(), key -> new IntArrayList(32));
            subtasks.add(executionVertex.getParallelSubtaskIndex());
        }
        return result;
    }

    protected void transitionToScheduled(List<ExecutionVertexID> verticesToDeploy) {
        verticesToDeploy.forEach(executionVertexId -> this.getExecutionVertex((ExecutionVertexID)executionVertexId).getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED));
    }

    protected void setGlobalFailureCause(@Nullable Throwable cause, long timestamp) {
        if (cause != null) {
            this.executionGraph.initFailureCause(cause, timestamp);
        }
    }

    protected ComponentMainThreadExecutor getMainThreadExecutor() {
        return this.mainThreadExecutor;
    }

    protected void failJob(Throwable cause, long timestamp) {
        this.incrementVersionsOfAllVertices();
        this.cancelAllPendingSlotRequestsInternal();
        this.executionGraph.failJob(cause, timestamp);
        this.getJobTerminationFuture().thenRun(() -> this.archiveGlobalFailure(cause));
    }

    protected final SchedulingTopology getSchedulingTopology() {
        return this.schedulingTopology;
    }

    protected final ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker() {
        return this.executionGraph.getResultPartitionAvailabilityChecker();
    }

    protected final void transitionToRunning() {
        this.executionGraph.transitionToRunning();
    }

    public ExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) {
        return this.executionGraph.getAllVertices().get(executionVertexId.getJobVertexId()).getTaskVertices()[executionVertexId.getSubtaskIndex()];
    }

    public ExecutionJobVertex getExecutionJobVertex(JobVertexID jobVertexId) {
        return this.executionGraph.getAllVertices().get(jobVertexId);
    }

    protected JobGraph getJobGraph() {
        return this.jobGraph;
    }

    protected abstract long getNumberOfRestarts();

    private Map<ExecutionVertexID, ExecutionVertexVersion> incrementVersionsOfAllVertices() {
        return this.executionVertexVersioner.recordVertexModifications(IterableUtils.toStream(this.schedulingTopology.getVertices()).map(Vertex::getId).collect(Collectors.toSet()));
    }

    protected abstract void cancelAllPendingSlotRequestsInternal();

    protected void transitionExecutionGraphState(JobStatus current, JobStatus newState) {
        this.executionGraph.transitionState(current, newState);
    }

    @VisibleForTesting
    CheckpointCoordinator getCheckpointCoordinator() {
        return this.executionGraph.getCheckpointCoordinator();
    }

    @VisibleForTesting
    public ExecutionGraph getExecutionGraph() {
        return this.executionGraph;
    }

    @Override
    public final void startScheduling() {
        this.mainThreadExecutor.assertRunningInMainThread();
        SchedulerBase.registerJobMetrics(this.jobManagerJobMetricGroup, this.executionGraph, (Gauge<Long>)((Gauge)this::getNumberOfRestarts), this.deploymentStateTimeMetrics, this.executionGraph::registerJobStatusListener, this.executionGraph.getStatusTimestamp(JobStatus.INITIALIZING), this.jobStatusMetricsSettings);
        this.operatorCoordinatorHandler.startAllOperatorCoordinators();
        this.startSchedulingInternal();
    }

    public static void registerJobMetrics(MetricGroup metrics, JobStatusProvider jobStatusProvider, Gauge<Long> numberOfRestarts, DeploymentStateTimeMetrics deploymentTimeMetrics, Consumer<JobStatusListener> jobStatusListenerRegistrar, long initializationTimestamp, MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings) {
        metrics.gauge("downtime", (Gauge)new DownTimeGauge(jobStatusProvider));
        metrics.gauge("uptime", (Gauge)new UpTimeGauge(jobStatusProvider));
        metrics.gauge("numRestarts", numberOfRestarts);
        metrics.gauge("fullRestarts", numberOfRestarts);
        JobStatusMetrics jobStatusMetrics = new JobStatusMetrics(initializationTimestamp, jobStatusMetricsSettings);
        jobStatusMetrics.registerMetrics(metrics);
        jobStatusListenerRegistrar.accept(jobStatusMetrics);
        deploymentTimeMetrics.registerMetrics(metrics);
    }

    protected abstract void startSchedulingInternal();

    public CompletableFuture<Void> closeAsync() {
        this.mainThreadExecutor.assertRunningInMainThread();
        FlinkException cause = new FlinkException("Scheduler is being stopped.");
        CompletableFuture checkpointServicesShutdownFuture = FutureUtils.composeAfterwards((CompletableFuture)this.executionGraph.getTerminationFuture().thenAcceptAsync(this::shutDownCheckpointServices, (Executor)this.getMainThreadExecutor()), this.checkpointsCleaner::closeAsync);
        FutureUtils.assertNoException((CompletableFuture)checkpointServicesShutdownFuture);
        this.incrementVersionsOfAllVertices();
        this.cancelAllPendingSlotRequestsInternal();
        this.executionGraph.suspend(cause);
        this.operatorCoordinatorHandler.disposeAllOperatorCoordinators();
        return checkpointServicesShutdownFuture;
    }

    @Override
    public void cancel() {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.incrementVersionsOfAllVertices();
        this.cancelAllPendingSlotRequestsInternal();
        this.executionGraph.cancel();
    }

    @Override
    public CompletableFuture<JobStatus> getJobTerminationFuture() {
        return this.executionGraph.getTerminationFuture();
    }

    protected final void archiveGlobalFailure(Throwable failure) {
        this.archiveGlobalFailure(failure, this.executionGraph.getStatusTimestamp(JobStatus.FAILED), StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map(ExecutionVertex::getCurrentExecutionAttempt).collect(Collectors.toSet()));
    }

    private void archiveGlobalFailure(Throwable failure, long timestamp, Iterable<Execution> executions) {
        this.exceptionHistory.add(RootExceptionHistoryEntry.fromGlobalFailure(failure, timestamp, executions));
        this.log.debug("Archive global failure.", failure);
    }

    protected final void archiveFromFailureHandlingResult(FailureHandlingResultSnapshot failureHandlingResult) {
        if (failureHandlingResult.getRootCauseExecution().isPresent()) {
            Execution rootCauseExecution = failureHandlingResult.getRootCauseExecution().get();
            RootExceptionHistoryEntry rootEntry = RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(failureHandlingResult);
            this.exceptionHistory.add(rootEntry);
            this.log.debug("Archive local failure causing attempt {} to fail: {}", (Object)rootCauseExecution.getAttemptId(), (Object)rootEntry.getExceptionAsString());
        } else {
            this.archiveGlobalFailure(failureHandlingResult.getRootCause(), failureHandlingResult.getTimestamp(), failureHandlingResult.getConcurrentlyFailedExecution());
        }
    }

    @Override
    public final boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) {
        ExecutionAttemptID attemptId = taskExecutionState.getID();
        Execution execution = this.executionGraph.getRegisteredExecutions().get(attemptId);
        if (execution != null && this.executionGraph.updateState(taskExecutionState)) {
            this.onTaskExecutionStateUpdate(execution, taskExecutionState);
            return true;
        }
        return false;
    }

    private void onTaskExecutionStateUpdate(Execution execution, TaskExecutionStateTransition taskExecutionState) {
        if (execution.getState() != taskExecutionState.getExecutionState()) {
            return;
        }
        switch (taskExecutionState.getExecutionState()) {
            case FINISHED: {
                this.onTaskFinished(execution);
                break;
            }
            case FAILED: {
                this.onTaskFailed(execution);
            }
        }
    }

    protected abstract void onTaskFinished(Execution var1);

    protected abstract void onTaskFailed(Execution var1);

    @Override
    public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
        this.mainThreadExecutor.assertRunningInMainThread();
        return this.executionGraphHandler.requestNextInputSplit(vertexID, executionAttempt);
    }

    @Override
    public ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
        this.mainThreadExecutor.assertRunningInMainThread();
        return this.executionGraphHandler.requestPartitionState(intermediateResultId, resultPartitionId);
    }

    @VisibleForTesting
    public Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
        return this.exceptionHistory.toArrayList();
    }

    @Override
    public ExecutionGraphInfo requestJob() {
        this.mainThreadExecutor.assertRunningInMainThread();
        return new ExecutionGraphInfo(ArchivedExecutionGraph.createFrom(this.executionGraph), this.getExceptionHistory());
    }

    @Override
    public JobStatus requestJobStatus() {
        return this.executionGraph.getState();
    }

    @Override
    public JobDetails requestJobDetails() {
        this.mainThreadExecutor.assertRunningInMainThread();
        return JobDetails.createDetailsForJob(this.executionGraph);
    }

    @Override
    public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        return this.kvStateHandler.requestKvStateLocation(jobId, registrationName);
    }

    @Override
    public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.kvStateHandler.notifyKvStateRegistered(jobId, jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
    }

    @Override
    public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) throws FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.kvStateHandler.notifyKvStateUnregistered(jobId, jobVertexId, keyGroupRange, registrationName);
    }

    @Override
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.executionGraph.updateAccumulators(accumulatorSnapshot);
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(String targetDirectory, boolean cancelJob, SavepointFormatType formatType) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        StopWithSavepointTerminationManager.checkSavepointActionPreconditions(checkpointCoordinator, targetDirectory, this.getJobId(), this.log);
        this.log.info("Triggering {}savepoint for job {}.", (Object)(cancelJob ? "cancel-with-" : ""), (Object)this.jobGraph.getJobID());
        if (cancelJob) {
            this.stopCheckpointScheduler();
        }
        return ((CompletableFuture)checkpointCoordinator.triggerSavepoint(targetDirectory, formatType).thenApply(CompletedCheckpoint::getExternalPointer)).handleAsync((path, throwable) -> {
            if (throwable != null) {
                if (cancelJob) {
                    this.startCheckpointScheduler();
                }
                throw new CompletionException((Throwable)throwable);
            }
            if (cancelJob) {
                this.log.info("Savepoint stored in {}. Now cancelling {}.", path, (Object)this.jobGraph.getJobID());
                this.cancel();
            }
            return path;
        }, (Executor)this.mainThreadExecutor);
    }

    @Override
    public CompletableFuture<String> triggerCheckpoint() {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        JobID jobID = this.jobGraph.getJobID();
        if (checkpointCoordinator == null) {
            throw new IllegalStateException(String.format("Job %s is not a streaming job.", jobID));
        }
        this.log.info("Triggering a manual checkpoint for job {}.", (Object)jobID);
        return ((CompletableFuture)checkpointCoordinator.triggerCheckpoint(false).thenApply(CompletedCheckpoint::getExternalPointer)).handleAsync((path, throwable) -> {
            if (throwable != null) {
                throw new CompletionException((Throwable)throwable);
            }
            return path;
        }, (Executor)this.mainThreadExecutor);
    }

    @Override
    public void stopCheckpointScheduler() {
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            this.log.info("Periodic checkpoint scheduling could not be stopped due to the CheckpointCoordinator being shutdown.");
        } else {
            checkpointCoordinator.stopCheckpointScheduler();
        }
    }

    @Override
    public void startCheckpointScheduler() {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            this.log.info("Periodic checkpoint scheduling could not be started due to the CheckpointCoordinator being shutdown.");
        } else if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            try {
                checkpointCoordinator.startCheckpointScheduler();
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }

    @Override
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState) {
        this.executionGraphHandler.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
    }

    @Override
    public void declineCheckpoint(DeclineCheckpoint decline) {
        this.executionGraphHandler.declineCheckpoint(decline);
    }

    @Override
    public void reportCheckpointMetrics(JobID jobID, ExecutionAttemptID attemptId, long id, CheckpointMetrics metrics) {
        this.executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
    }

    @Override
    public CompletableFuture<String> stopWithSavepoint(@Nullable String targetDirectory, boolean terminate, SavepointFormatType formatType) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        StopWithSavepointTerminationManager.checkSavepointActionPreconditions(checkpointCoordinator, targetDirectory, this.executionGraph.getJobID(), this.log);
        this.log.info("Triggering stop-with-savepoint for job {}.", (Object)this.jobGraph.getJobID());
        this.stopCheckpointScheduler();
        CompletableFuture<Collection<ExecutionState>> executionTerminationsFuture = this.getCombinedExecutionTerminationFuture();
        CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSynchronousSavepoint(terminate, targetDirectory, formatType);
        StopWithSavepointTerminationManager stopWithSavepointTerminationManager = new StopWithSavepointTerminationManager(new StopWithSavepointTerminationHandlerImpl(this.jobGraph.getJobID(), this, this.log));
        return stopWithSavepointTerminationManager.stopWithSavepoint(savepointFuture, executionTerminationsFuture, this.mainThreadExecutor);
    }

    private CompletableFuture<Collection<ExecutionState>> getCombinedExecutionTerminationFuture() {
        return FutureUtils.combineAll((Collection)StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map(ExecutionVertex::getCurrentExecutionAttempt).map(Execution::getTerminalStateFuture).collect(Collectors.toList()));
    }

    @Override
    public void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecutionId, OperatorID operatorId, OperatorEvent evt) throws FlinkException {
        this.operatorCoordinatorHandler.deliverOperatorEventToCoordinator(taskExecutionId, operatorId, evt);
    }

    @Override
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operator, CoordinationRequest request) throws FlinkException {
        return this.operatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(operator, request);
    }

    @VisibleForTesting
    JobID getJobId() {
        return this.jobGraph.getJobID();
    }
}

