/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.DispatcherException;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.FunctionWithException;

public abstract class Dispatcher
extends FencedRpcEndpoint<DispatcherId>
implements DispatcherGateway,
LeaderContender,
SubmittedJobGraphStore.SubmittedJobGraphListener {
    public static final String DISPATCHER_NAME = "dispatcher";
    private final Configuration configuration;
    private final SubmittedJobGraphStore submittedJobGraphStore;
    private final RunningJobsRegistry runningJobsRegistry;
    private final HighAvailabilityServices highAvailabilityServices;
    private final ResourceManagerGateway resourceManagerGateway;
    private final JobManagerSharedServices jobManagerSharedServices;
    private final HeartbeatServices heartbeatServices;
    private final BlobServer blobServer;
    private final FatalErrorHandler fatalErrorHandler;
    private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;
    private final LeaderElectionService leaderElectionService;
    private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
    private final JobManagerRunnerFactory jobManagerRunnerFactory;
    private final JobManagerMetricGroup jobManagerMetricGroup;
    private final HistoryServerArchivist historyServerArchivist;
    @Nullable
    private final String metricQueryServicePath;
    @Nullable
    protected final String restAddress;
    private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
    private CompletableFuture<Void> recoveryOperation = CompletableFuture.completedFuture(null);

    public Dispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, HistoryServerArchivist historyServerArchivist) throws Exception {
        super(rpcService, endpointId);
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityServices);
        this.resourceManagerGateway = (ResourceManagerGateway)Preconditions.checkNotNull((Object)resourceManagerGateway);
        this.heartbeatServices = (HeartbeatServices)Preconditions.checkNotNull((Object)heartbeatServices);
        this.blobServer = (BlobServer)Preconditions.checkNotNull((Object)blobServer);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.submittedJobGraphStore = (SubmittedJobGraphStore)Preconditions.checkNotNull((Object)submittedJobGraphStore);
        this.jobManagerMetricGroup = (JobManagerMetricGroup)Preconditions.checkNotNull((Object)jobManagerMetricGroup);
        this.metricQueryServicePath = metricServiceQueryPath;
        this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(configuration, this.blobServer);
        this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
        this.jobManagerRunnerFutures = new HashMap<JobID, CompletableFuture<JobManagerRunner>>(16);
        this.leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
        this.restAddress = restAddress;
        this.historyServerArchivist = (HistoryServerArchivist)Preconditions.checkNotNull((Object)historyServerArchivist);
        this.archivedExecutionGraphStore = (ArchivedExecutionGraphStore)Preconditions.checkNotNull((Object)archivedExecutionGraphStore);
        this.jobManagerRunnerFactory = (JobManagerRunnerFactory)Preconditions.checkNotNull((Object)jobManagerRunnerFactory);
        this.jobManagerTerminationFutures = new HashMap<JobID, CompletableFuture<Void>>(2);
    }

    @Override
    public CompletableFuture<Void> postStop() {
        this.log.info("Stopping dispatcher {}.", (Object)this.getAddress());
        CompletableFuture<Void> allJobManagerRunnersTerminationFuture = this.terminateJobManagerRunnersAndGetTerminationFuture();
        return FutureUtils.runAfterwards(allJobManagerRunnersTerminationFuture, () -> {
            Exception exception = null;
            try {
                this.jobManagerSharedServices.shutdown();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
            }
            try {
                this.submittedJobGraphStore.stop();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
            try {
                this.leaderElectionService.stop();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
            this.jobManagerMetricGroup.close();
            if (exception != null) {
                throw exception;
            }
            this.log.info("Stopped dispatcher {}.", (Object)this.getAddress());
        });
    }

    @Override
    public void start() throws Exception {
        super.start();
        this.submittedJobGraphStore.start(this);
        this.leaderElectionService.start(this);
        this.registerDispatcherMetrics(this.jobManagerMetricGroup);
    }

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
        JobID jobId = jobGraph.getJobID();
        this.log.info("Submitting job {} ({}).", (Object)jobId, (Object)jobGraph.getName());
        try {
            jobSchedulingStatus = this.runningJobsRegistry.getJobSchedulingStatus(jobId);
        }
        catch (IOException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), (Throwable)e));
        }
        if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || this.jobManagerRunnerFutures.containsKey(jobId)) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", new Object[]{jobSchedulingStatus}))));
        }
        CompletionStage persistAndRunFuture = this.waitForTerminatingJobManager(jobId, jobGraph, this::persistAndRunJob).thenApply(ignored -> Acknowledge.get());
        return ((CompletableFuture)persistAndRunFuture).exceptionally(throwable -> {
            Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)throwable);
            this.log.error("Failed to submit job {}.", (Object)jobId, (Object)strippedThrowable);
            throw new CompletionException((Throwable)((Object)new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable)));
        });
    }

    private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
        this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
        CompletableFuture<Void> runJobFuture = this.runJob(jobGraph);
        return runJobFuture.whenComplete(BiConsumerWithException.unchecked((ignored, throwable) -> {
            if (throwable != null) {
                this.submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
            }
        }));
    }

    private CompletableFuture<Void> runJob(JobGraph jobGraph) {
        Preconditions.checkState((!this.jobManagerRunnerFutures.containsKey(jobGraph.getJobID()) ? 1 : 0) != 0);
        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = this.createJobManagerRunner(jobGraph);
        this.jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
        return ((CompletableFuture)jobManagerRunnerFuture.thenApply(FunctionUtils.nullFn())).whenCompleteAsync((ignored, throwable) -> {
            if (throwable != null) {
                this.jobManagerRunnerFutures.remove(jobGraph.getJobID());
            }
        }, (Executor)this.getMainThreadExecutor());
    }

    private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
        RpcService rpcService = this.getRpcService();
        CompletableFuture jobManagerRunnerFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> this.jobManagerRunnerFactory.createJobManagerRunner(ResourceID.generate(), jobGraph, this.configuration, rpcService, this.highAvailabilityServices, this.heartbeatServices, this.blobServer, this.jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(this.jobManagerMetricGroup), this.fatalErrorHandler)), rpcService.getExecutor());
        return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
    }

    private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
        JobID jobId = jobManagerRunner.getJobGraph().getJobID();
        jobManagerRunner.getResultFuture().whenCompleteAsync((archivedExecutionGraph, throwable) -> {
            if (jobManagerRunner == this.jobManagerRunnerFutures.get(jobId).getNow(null)) {
                if (archivedExecutionGraph != null) {
                    this.jobReachedGloballyTerminalState((ArchivedExecutionGraph)archivedExecutionGraph);
                } else {
                    Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)throwable);
                    if (strippedThrowable instanceof JobNotFinishedException) {
                        this.jobNotFinished(jobId);
                    } else {
                        this.jobMasterFailed(jobId, strippedThrowable);
                    }
                }
            } else {
                this.log.debug("There is a newer JobManagerRunner for the job {}.", (Object)jobId);
            }
        }, (Executor)this.getMainThreadExecutor());
        jobManagerRunner.start();
        return jobManagerRunner;
    }

    @Override
    public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
        return CompletableFuture.completedFuture(Collections.unmodifiableSet(new HashSet<JobID>(this.jobManagerRunnerFutures.keySet())));
    }

    @Override
    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        return CompletableFuture.supplyAsync(() -> {
            this.log.info("Disposing savepoint {}.", (Object)savepointPath);
            try {
                Checkpoints.disposeSavepoint(savepointPath, this.configuration, classLoader, this.log);
            }
            catch (IOException | FlinkException e) {
                throw new CompletionException(new FlinkException(String.format("Could not dispose savepoint %s.", savepointPath), e));
            }
            return Acknowledge.get();
        }, this.jobManagerSharedServices.getScheduledExecutorService());
    }

    @Override
    public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = this.getJobMasterGatewayFuture(jobId);
        return jobMasterGatewayFuture.thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
    }

    @Override
    public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) {
        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = this.getJobMasterGatewayFuture(jobId);
        return jobMasterGatewayFuture.thenCompose(jobMasterGateway -> jobMasterGateway.stop(timeout));
    }

    @Override
    public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) {
        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = this.getJobMasterGatewayFuture(jobId);
        return jobMasterGatewayFuture.thenCompose(jobMasterGateway -> jobMasterGateway.rescaleJob(newParallelism, rescalingBehaviour, timeout));
    }

    @Override
    public CompletableFuture<String> requestRestAddress(Time timeout) {
        if (this.restAddress != null) {
            return CompletableFuture.completedFuture(this.restAddress);
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new DispatcherException("The Dispatcher has not been started with a REST endpoint.")));
    }

    @Override
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
        CompletableFuture<ResourceOverview> taskManagerOverviewFuture = this.resourceManagerGateway.requestResourceOverview(timeout);
        List optionalJobInformation = this.queryJobMastersForInformation(jobMasterGateway -> jobMasterGateway.requestJobStatus(timeout));
        FutureUtils.ConjunctFuture allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);
        CompletionStage allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
        JobsOverview completedJobsOverview = this.archivedExecutionGraphStore.getStoredJobsOverview();
        return ((CompletableFuture)allJobsFuture).thenCombine(taskManagerOverviewFuture, (runningJobsStatus, resourceOverview) -> {
            JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
            return new ClusterOverview((ResourceOverview)resourceOverview, allJobsOverview);
        });
    }

    @Override
    public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
        List individualOptionalJobDetails = this.queryJobMastersForInformation(jobMasterGateway -> jobMasterGateway.requestJobDetails(timeout));
        FutureUtils.ConjunctFuture optionalCombinedJobDetails = FutureUtils.combineAll(individualOptionalJobDetails);
        CompletionStage combinedJobDetails = optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);
        Collection<JobDetails> completedJobDetails = this.archivedExecutionGraphStore.getAvailableJobDetails();
        return ((CompletableFuture)combinedJobDetails).thenApply(runningJobDetails -> {
            ArrayList<JobDetails> allJobDetails = new ArrayList<JobDetails>(completedJobDetails.size() + runningJobDetails.size());
            allJobDetails.addAll((Collection<JobDetails>)runningJobDetails);
            allJobDetails.addAll(completedJobDetails);
            return new MultipleJobsDetails(allJobDetails);
        });
    }

    @Override
    public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) {
        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = this.getJobMasterGatewayFuture(jobId);
        CompletionStage jobStatusFuture = jobMasterGatewayFuture.thenCompose(jobMasterGateway -> jobMasterGateway.requestJobStatus(timeout));
        return ((CompletableFuture)jobStatusFuture).exceptionally(throwable -> {
            JobDetails jobDetails = this.archivedExecutionGraphStore.getAvailableJobDetails(jobId);
            if (jobDetails == null) {
                throw new CompletionException(ExceptionUtils.stripCompletionException((Throwable)throwable));
            }
            return jobDetails.getStatus();
        });
    }

    @Override
    public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobID jobId, JobVertexID jobVertexId) {
        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = this.getJobMasterGatewayFuture(jobId);
        return jobMasterGatewayFuture.thenCompose(jobMasterGateway -> jobMasterGateway.requestOperatorBackPressureStats(jobVertexId));
    }

    @Override
    public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = this.getJobMasterGatewayFuture(jobId);
        CompletionStage archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout));
        return ((CompletableFuture)archivedExecutionGraphFuture).exceptionally(throwable -> {
            ArchivedExecutionGraph serializableExecutionGraph = this.archivedExecutionGraphStore.get(jobId);
            if (serializableExecutionGraph == null) {
                throw new CompletionException(ExceptionUtils.stripCompletionException((Throwable)throwable));
            }
            return serializableExecutionGraph;
        });
    }

    @Override
    public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = this.jobManagerRunnerFutures.get(jobId);
        if (jobManagerRunnerFuture == null) {
            ArchivedExecutionGraph archivedExecutionGraph = this.archivedExecutionGraphStore.get(jobId);
            if (archivedExecutionGraph == null) {
                return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
            }
            return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
        }
        return ((CompletableFuture)jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture)).thenApply(JobResult::createFrom);
    }

    @Override
    public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
        if (this.metricQueryServicePath != null) {
            return CompletableFuture.completedFuture(Collections.singleton(this.metricQueryServicePath));
        }
        return CompletableFuture.completedFuture(Collections.emptyList());
    }

    @Override
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
        return this.resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout);
    }

    @Override
    public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
        return CompletableFuture.completedFuture(this.blobServer.getPort());
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(JobID jobId, String targetDirectory, boolean cancelJob, Time timeout) {
        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = this.getJobMasterGatewayFuture(jobId);
        return jobMasterGatewayFuture.thenCompose(jobMasterGateway -> jobMasterGateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
    }

    @Override
    public CompletableFuture<Acknowledge> shutDownCluster() {
        this.shutDown();
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) {
        CompletableFuture<Void> cleanupFuture = this.removeJob(jobId, cleanupHA);
        this.registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
    }

    private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
        Preconditions.checkState((!this.jobManagerTerminationFutures.containsKey(jobId) ? 1 : 0) != 0);
        this.jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture);
        jobManagerRunnerTerminationFuture.thenRunAsync(() -> {
            CompletableFuture<Void> terminationFuture = this.jobManagerTerminationFutures.remove(jobId);
            if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) {
                this.jobManagerTerminationFutures.put(jobId, terminationFuture);
            }
        }, this.getUnfencedMainThreadExecutor());
    }

    private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = this.jobManagerRunnerFutures.remove(jobId);
        CompletionStage<Object> jobManagerRunnerTerminationFuture = jobManagerRunnerFuture != null ? jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync) : CompletableFuture.completedFuture(null);
        return ((CompletableFuture)jobManagerRunnerTerminationFuture).thenRunAsync(() -> this.cleanUpJobData(jobId, cleanupHA), this.getRpcService().getExecutor());
    }

    private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
        this.jobManagerMetricGroup.removeJob(jobId);
        boolean cleanupHABlobs = false;
        if (cleanupHA) {
            try {
                this.submittedJobGraphStore.removeJobGraph(jobId);
                cleanupHABlobs = true;
            }
            catch (Exception e) {
                this.log.warn("Could not properly remove job {} from submitted job graph store.", (Object)jobId, (Object)e);
            }
            try {
                this.runningJobsRegistry.clearJob(jobId);
            }
            catch (IOException e) {
                this.log.warn("Could not properly remove job {} from the running jobs registry.", (Object)jobId, (Object)e);
            }
        } else {
            try {
                this.submittedJobGraphStore.releaseJobGraph(jobId);
            }
            catch (Exception e) {
                this.log.warn("Could not properly release job {} from submitted job graph store.", (Object)jobId, (Object)e);
            }
        }
        this.blobServer.cleanupJob(jobId, cleanupHABlobs);
    }

    private void terminateJobManagerRunners() {
        this.log.info("Stopping all currently running jobs of dispatcher {}.", (Object)this.getAddress());
        HashSet<JobID> jobsToRemove = new HashSet<JobID>(this.jobManagerRunnerFutures.keySet());
        for (JobID jobId : jobsToRemove) {
            this.removeJobAndRegisterTerminationFuture(jobId, false);
        }
    }

    private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFuture() {
        this.terminateJobManagerRunners();
        Collection<CompletableFuture<Void>> values = this.jobManagerTerminationFutures.values();
        return FutureUtils.completeAll(values);
    }

    @VisibleForTesting
    Collection<JobGraph> recoverJobs() throws Exception {
        this.log.info("Recovering all persisted jobs.");
        Collection<JobID> jobIds = this.submittedJobGraphStore.getJobIds();
        try {
            return this.recoverJobGraphs(jobIds);
        }
        catch (Exception e) {
            for (JobID jobId : jobIds) {
                try {
                    this.submittedJobGraphStore.releaseJobGraph(jobId);
                }
                catch (Exception ie) {
                    e.addSuppressed(ie);
                }
            }
            throw e;
        }
    }

    @Nonnull
    private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds) throws Exception {
        ArrayList<JobGraph> jobGraphs = new ArrayList<JobGraph>(jobIds.size());
        for (JobID jobId : jobIds) {
            JobGraph jobGraph = this.recoverJob(jobId);
            if (jobGraph == null) {
                throw new FlinkJobNotFoundException(jobId);
            }
            jobGraphs.add(jobGraph);
        }
        return jobGraphs;
    }

    @Nullable
    private JobGraph recoverJob(JobID jobId) throws Exception {
        this.log.debug("Recover job {}.", (Object)jobId);
        SubmittedJobGraph submittedJobGraph = this.submittedJobGraphStore.recoverJobGraph(jobId);
        if (submittedJobGraph != null) {
            return submittedJobGraph.getJobGraph();
        }
        return null;
    }

    protected void onFatalError(Throwable throwable) {
        this.fatalErrorHandler.onFatalError(throwable);
    }

    protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
        Preconditions.checkArgument((boolean)archivedExecutionGraph.getState().isGloballyTerminalState(), (String)"Job %s is in state %s which is not globally terminal.", (Object[])new Object[]{archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState()});
        this.log.info("Job {} reached globally terminal state {}.", (Object)archivedExecutionGraph.getJobID(), (Object)archivedExecutionGraph.getState());
        this.archiveExecutionGraph(archivedExecutionGraph);
        JobID jobId = archivedExecutionGraph.getJobID();
        this.removeJobAndRegisterTerminationFuture(jobId, true);
    }

    private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
        try {
            this.archivedExecutionGraphStore.put(archivedExecutionGraph);
        }
        catch (IOException e) {
            this.log.info("Could not store completed job {}({}).", new Object[]{archivedExecutionGraph.getJobName(), archivedExecutionGraph.getJobID(), e});
        }
        CompletableFuture<Acknowledge> executionGraphFuture = this.historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
        executionGraphFuture.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                this.log.info("Could not archive completed job {}({}) to the history server.", new Object[]{archivedExecutionGraph.getJobName(), archivedExecutionGraph.getJobID(), throwable});
            }
        });
    }

    protected void jobNotFinished(JobID jobId) {
        this.log.info("Job {} was not finished by JobManager.", (Object)jobId);
        this.removeJobAndRegisterTerminationFuture(jobId, false);
    }

    private void jobMasterFailed(JobID jobId, Throwable cause) {
        this.onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
    }

    private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = this.jobManagerRunnerFutures.get(jobId);
        if (jobManagerRunnerFuture == null) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
        }
        CompletionStage leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
        return ((CompletableFuture)leaderGatewayFuture).thenApplyAsync(jobMasterGateway -> {
            if (this.jobManagerRunnerFutures.containsKey(jobId)) {
                return jobMasterGateway;
            }
            throw new CompletionException((Throwable)((Object)new FlinkJobNotFoundException(jobId)));
        }, (Executor)this.getMainThreadExecutor());
    }

    private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> optionalCollection) {
        return optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    }

    @Nonnull
    private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) {
        int numberJobsRunning = this.jobManagerRunnerFutures.size();
        ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<CompletableFuture<Optional<T>>>(numberJobsRunning);
        for (JobID jobId : this.jobManagerRunnerFutures.keySet()) {
            CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = this.getJobMasterGatewayFuture(jobId);
            CompletionStage optionalRequest = ((CompletableFuture)jobMasterGatewayFuture.thenCompose(queryFunction::apply)).handle((value, throwable) -> Optional.ofNullable(value));
            optionalJobInformation.add((CompletableFuture<Optional<T>>)optionalRequest);
        }
        return optionalJobInformation;
    }

    @Override
    public void grantLeadership(UUID newLeaderSessionID) {
        this.runAsyncWithoutFencing(() -> {
            this.log.info("Dispatcher {} was granted leadership with fencing token {}", (Object)this.getAddress(), (Object)newLeaderSessionID);
            CompletionStage recoveredJobsFuture = this.recoveryOperation.thenApplyAsync(FunctionUtils.uncheckedFunction(ignored -> this.recoverJobs()), this.getRpcService().getExecutor());
            CompletionStage fencingTokenFuture = ((CompletableFuture)recoveredJobsFuture).thenComposeAsync(recoveredJobs -> this.tryAcceptLeadershipAndRunJobs(newLeaderSessionID, (Collection<JobGraph>)recoveredJobs), this.getUnfencedMainThreadExecutor());
            CompletionStage confirmationFuture = ((CompletableFuture)fencingTokenFuture).thenCombineAsync(recoveredJobsFuture, BiFunctionWithException.unchecked((confirmLeadership, recoveredJobs) -> {
                if (confirmLeadership.booleanValue()) {
                    this.leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
                } else {
                    for (JobGraph recoveredJob : recoveredJobs) {
                        this.submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
                    }
                }
                return null;
            }), this.getRpcService().getExecutor());
            ((CompletableFuture)confirmationFuture).whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    this.onFatalError(ExceptionUtils.stripCompletionException((Throwable)throwable));
                }
            });
            this.recoveryOperation = confirmationFuture;
        });
    }

    private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
        DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
        if (this.leaderElectionService.hasLeadership(newLeaderSessionID)) {
            this.log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", (Object)this.getAddress(), (Object)dispatcherId);
            this.setNewFencingToken(dispatcherId);
            ArrayList<CompletableFuture<Void>> runFutures = new ArrayList<CompletableFuture<Void>>(recoveredJobs.size());
            for (JobGraph recoveredJob : recoveredJobs) {
                CompletableFuture<Void> runFuture = this.waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
                runFutures.add(runFuture);
            }
            return FutureUtils.waitForAll(runFutures).thenApply(ignored -> true);
        }
        this.log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", (Object)this.getAddress(), (Object)dispatcherId);
        return CompletableFuture.completedFuture(false);
    }

    private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) {
        CompletionStage jobManagerTerminationFuture = this.getJobTerminationFuture(jobId).exceptionally(throwable -> {
            throw new CompletionException((Throwable)((Object)new DispatcherException(String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId), (Throwable)throwable)));
        });
        return ((CompletableFuture)jobManagerTerminationFuture).thenComposeAsync(FunctionUtils.uncheckedFunction(ignored -> {
            this.jobManagerTerminationFutures.remove(jobId);
            return (CompletableFuture)action.apply((Object)jobGraph);
        }), (Executor)this.getMainThreadExecutor());
    }

    CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
        if (this.jobManagerRunnerFutures.containsKey(jobId)) {
            return FutureUtils.completedExceptionally((Throwable)((Object)new DispatcherException(String.format("Job with job id %s is still running.", jobId))));
        }
        return this.jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
    }

    @VisibleForTesting
    CompletableFuture<Void> getRecoveryOperation() {
        return this.recoveryOperation;
    }

    private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
        if (this.getFencingToken() != null) {
            this.clearDispatcherState();
        }
        this.setFencingToken(dispatcherId);
    }

    private void clearDispatcherState() {
        this.terminateJobManagerRunners();
    }

    private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
        jobManagerMetricGroup.gauge("numRunningJobs", () -> (long)this.jobManagerRunnerFutures.size());
    }

    @Override
    public void revokeLeadership() {
        this.runAsyncWithoutFencing(() -> {
            this.log.info("Dispatcher {} was revoked leadership.", (Object)this.getAddress());
            this.setNewFencingToken(null);
        });
    }

    @Override
    public void handleError(Exception exception) {
        this.onFatalError((Throwable)((Object)new DispatcherException("Received an error from the LeaderElectionService.", exception)));
    }

    @Override
    public void onAddedJobGraph(JobID jobId) {
        this.runAsync(() -> {
            if (!this.jobManagerRunnerFutures.containsKey(jobId)) {
                CompletionStage recoveredJob = this.recoveryOperation.thenApplyAsync(FunctionUtils.uncheckedFunction(ignored -> Optional.ofNullable(this.recoverJob(jobId))), this.getRpcService().getExecutor());
                DispatcherId dispatcherId = (DispatcherId)((Object)((Object)this.getFencingToken()));
                CompletionStage submissionFuture = ((CompletableFuture)recoveredJob).thenComposeAsync(jobGraphOptional -> jobGraphOptional.map(FunctionUtils.uncheckedFunction(jobGraph -> this.tryRunRecoveredJobGraph((JobGraph)jobGraph, dispatcherId).thenAcceptAsync(FunctionUtils.uncheckedConsumer(isRecoveredJobRunning -> {
                    if (!isRecoveredJobRunning.booleanValue()) {
                        this.submittedJobGraphStore.releaseJobGraph(jobId);
                    }
                }), this.getRpcService().getExecutor()))).orElse(CompletableFuture.completedFuture(null)), this.getUnfencedMainThreadExecutor());
                ((CompletableFuture)submissionFuture).whenComplete((ignored, throwable) -> {
                    if (throwable != null) {
                        this.onFatalError((Throwable)((Object)new DispatcherException(String.format("Could not start the added job %s", jobId), ExceptionUtils.stripCompletionException((Throwable)throwable))));
                    }
                });
                this.recoveryOperation = submissionFuture;
            }
        });
    }

    private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
        if (this.leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
            JobID jobId = jobGraph.getJobID();
            if (this.jobManagerRunnerFutures.containsKey(jobId)) {
                this.log.debug("Ignore added JobGraph because the job {} is already running.", (Object)jobId);
                return CompletableFuture.completedFuture(true);
            }
            if (this.runningJobsRegistry.getJobSchedulingStatus(jobId) != RunningJobsRegistry.JobSchedulingStatus.DONE) {
                return this.waitForTerminatingJobManager(jobId, jobGraph, this::runJob).thenApply(ignored -> true);
            }
            this.log.debug("Ignore added JobGraph because the job {} has already been completed.", (Object)jobId);
        }
        return CompletableFuture.completedFuture(false);
    }

    @Override
    public void onRemovedJobGraph(JobID jobId) {
        this.runAsync(() -> {
            try {
                this.removeJobAndRegisterTerminationFuture(jobId, false);
            }
            catch (Exception e) {
                this.onFatalError((Throwable)((Object)new DispatcherException(String.format("Could not remove job %s.", jobId), e)));
            }
        });
    }

    public static enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory
    {
        INSTANCE;


        @Override
        public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
            return new JobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
        }
    }

    @FunctionalInterface
    public static interface JobManagerRunnerFactory {
        public JobManagerRunner createJobManagerRunner(ResourceID var1, JobGraph var2, Configuration var3, RpcService var4, HighAvailabilityServices var5, HeartbeatServices var6, BlobServer var7, JobManagerSharedServices var8, JobManagerJobMetricGroupFactory var9, FatalErrorHandler var10) throws Exception;
    }
}

