/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class MiniDispatcher
extends Dispatcher {
    private final ClusterEntrypoint.ExecutionMode executionMode;
    private boolean jobCancelled = false;

    public MiniDispatcher(RpcService rpcService, DispatcherId fencingToken, DispatcherServices dispatcherServices, @Nullable JobGraph jobGraph, @Nullable JobResult recoveredDirtyJob, DispatcherBootstrapFactory dispatcherBootstrapFactory, ClusterEntrypoint.ExecutionMode executionMode) throws Exception {
        super(rpcService, fencingToken, CollectionUtil.ofNullable((Object)jobGraph), CollectionUtil.ofNullable((Object)recoveredDirtyJob), dispatcherBootstrapFactory, dispatcherServices);
        this.executionMode = (ClusterEntrypoint.ExecutionMode)((Object)Preconditions.checkNotNull((Object)((Object)executionMode)));
    }

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        CompletableFuture<Acknowledge> acknowledgeCompletableFuture = super.submitJob(jobGraph, timeout);
        acknowledgeCompletableFuture.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                this.onFatalError(new FlinkException("Failed to submit job " + jobGraph.getJobID() + " in job mode.", throwable));
            }
        });
        return acknowledgeCompletableFuture;
    }

    @Override
    public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
        CompletableFuture<JobResult> jobResultFuture = super.requestJobResult(jobId, timeout);
        if (this.executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
            jobResultFuture.thenAccept(result -> {
                ApplicationStatus status;
                ApplicationStatus applicationStatus = status = result.getSerializedThrowable().isPresent() ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
                if (!ApplicationStatus.UNKNOWN.equals((Object)result.getApplicationStatus())) {
                    this.log.info("Shutting down cluster because someone retrieved the job result and the status is globally terminal.");
                    this.shutDownFuture.complete(status);
                }
            });
        } else {
            this.log.info("Not shutting down cluster after someone retrieved the job result.");
        }
        return jobResultFuture;
    }

    @Override
    public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
        this.jobCancelled = true;
        return super.cancelJob(jobId, timeout);
    }

    @Override
    protected CompletableFuture<Dispatcher.CleanupJobState> jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) {
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        CompletableFuture<Dispatcher.CleanupJobState> cleanupHAState = super.jobReachedTerminalState(executionGraphInfo);
        return cleanupHAState.thenApply(cleanupJobState -> {
            JobStatus jobStatus = Objects.requireNonNull(archivedExecutionGraph.getState(), "JobStatus should not be null here.");
            if (jobStatus.isGloballyTerminalState() && (this.jobCancelled || this.executionMode == ClusterEntrypoint.ExecutionMode.DETACHED)) {
                this.log.info("Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}", new Object[]{jobStatus, this.jobCancelled, this.executionMode});
                this.shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
            }
            return cleanupJobState;
        });
    }
}

