/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program;

import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

public abstract class ClusterClient<T> {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    final Optimizer compiler;
    protected final Configuration flinkConfig;
    protected final FiniteDuration timeout;
    protected final HighAvailabilityServices highAvailabilityServices;
    private final boolean sharedHaServices;
    private boolean printStatusDuringExecution = true;
    protected JobExecutionResult lastJobExecutionResult;
    private boolean detachedJobSubmission = false;

    public ClusterClient(Configuration flinkConfig) throws Exception {
        this(flinkConfig, HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)flinkConfig, (Executor)Executors.directExecutor(), (HighAvailabilityServicesUtils.AddressResolution)HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION), false);
    }

    public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices) {
        this.flinkConfig = (Configuration)Preconditions.checkNotNull((Object)flinkConfig);
        this.compiler = new Optimizer(new DataStatistics(), (CostEstimator)new DefaultCostEstimator(), flinkConfig);
        this.timeout = AkkaUtils.getClientTimeout((Configuration)flinkConfig);
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityServices);
        this.sharedHaServices = sharedHaServices;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() throws Exception {
        ClusterClient clusterClient = this;
        synchronized (clusterClient) {
            if (!this.sharedHaServices && this.highAvailabilityServices != null) {
                this.highAvailabilityServices.close();
            }
        }
    }

    public void setPrintStatusDuringExecution(boolean print) {
        this.printStatusDuringExecution = print;
    }

    public boolean getPrintStatusDuringExecution() {
        return this.printStatusDuringExecution;
    }

    public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalException {
        return LeaderRetrievalUtils.retrieveLeaderConnectionInfo((LeaderRetrievalService)this.highAvailabilityServices.getDispatcherLeaderRetriever(), (FiniteDuration)this.timeout);
    }

    public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
        return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan)ClusterClient.getOptimizedPlan(compiler, prog, parallelism));
    }

    public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
            if (prog.isUsingProgramEntryPoint()) {
                OptimizedPlan optimizedPlan = ClusterClient.getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
                return optimizedPlan;
            }
            if (prog.isUsingInteractiveMode()) {
                OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
                if (parallelism > 0) {
                    env.setParallelism(parallelism);
                }
                FlinkPlan flinkPlan = env.getOptimizedPlan(prog);
                return flinkPlan;
            }
            throw new RuntimeException("Couldn't determine program mode.");
        }
        finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

    public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
        Logger log = LoggerFactory.getLogger(ClusterClient.class);
        if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
            log.debug("Changing plan default parallelism from {} to {}", (Object)p.getDefaultParallelism(), (Object)parallelism);
            p.setDefaultParallelism(parallelism);
        }
        log.debug("Set parallelism {}, plan default parallelism {}", (Object)parallelism, (Object)p.getDefaultParallelism());
        return compiler.compile(p);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
            if (prog.isUsingProgramEntryPoint()) {
                JobWithJars jobWithJars = prog.getPlanWithJars();
                JobSubmissionResult jobSubmissionResult = this.run(jobWithJars, parallelism, prog.getSavepointSettings());
                return jobSubmissionResult;
            }
            if (prog.isUsingInteractiveMode()) {
                this.log.info("Starting program in interactive mode (detached: {})", (Object)this.isDetached());
                List<URL> libraries = prog.getAllLibraries();
                ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, this.isDetached(), prog.getSavepointSettings());
                ContextEnvironment.setAsContext(factory);
                try {
                    prog.invokeInteractiveModeForExecution();
                    if (this.lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
                        throw new ProgramMissingJobException("The program didn't contain a Flink job.");
                    }
                    if (this.isDetached()) {
                        JobSubmissionResult jobSubmissionResult = ((DetachedEnvironment)factory.getLastEnvCreated()).finalizeExecute();
                        return jobSubmissionResult;
                    }
                    JobExecutionResult jobExecutionResult = this.lastJobExecutionResult;
                    return jobExecutionResult;
                }
                finally {
                    ContextEnvironment.unsetContext();
                }
            }
            throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
        }
        finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

    public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
        return this.run(program, parallelism, SavepointRestoreSettings.none());
    }

    public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }
        OptimizedPlan optPlan = ClusterClient.getOptimizedPlan(this.compiler, jobWithJars, parallelism);
        return this.run((FlinkPlan)optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
    }

    public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
        return this.run(compiledPlan, libraries, classpaths, classLoader, SavepointRestoreSettings.none());
    }

    public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
        JobGraph job = ClusterClient.getJobGraph(this.flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
        return this.submitJob(job, classLoader);
    }

    public abstract CompletableFuture<JobStatus> getJobStatus(JobID var1);

    public abstract void cancel(JobID var1) throws Exception;

    public abstract String cancelWithSavepoint(JobID var1, @Nullable String var2) throws Exception;

    public abstract String stopWithSavepoint(JobID var1, boolean var2, @Nullable String var3) throws Exception;

    public abstract CompletableFuture<String> triggerSavepoint(JobID var1, @Nullable String var2) throws FlinkException;

    public abstract CompletableFuture<Acknowledge> disposeSavepoint(String var1) throws FlinkException;

    public abstract CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;

    public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws Exception {
        return this.getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    }

    public abstract Map<String, OptionalFailure<Object>> getAccumulators(JobID var1, ClassLoader var2) throws Exception;

    private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
        return ClusterClient.getOptimizedPlan(compiler, prog.getPlan(), parallelism);
    }

    public static JobGraph getJobGraph(Configuration flinkConfig, PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
        return ClusterClient.getJobGraph(flinkConfig, optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings);
    }

    public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
        JobGraph job;
        if (optPlan instanceof StreamingPlan) {
            job = ((StreamingPlan)optPlan).getJobGraph();
            job.setSavepointRestoreSettings(savepointSettings);
        } else {
            JobGraphGenerator gen = new JobGraphGenerator(flinkConfig);
            job = gen.compileJobGraph((OptimizedPlan)optPlan);
        }
        for (URL jar : jarFiles) {
            try {
                job.addJar(new Path(jar.toURI()));
            }
            catch (URISyntaxException e) {
                throw new RuntimeException("URL is invalid. This should not happen.", e);
            }
        }
        job.setClasspaths(classpaths);
        return job;
    }

    public abstract String getWebInterfaceURL();

    public abstract T getClusterId();

    public void setDetached(boolean isDetached) {
        this.detachedJobSubmission = isDetached;
    }

    public boolean isDetached() {
        return this.detachedJobSubmission;
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfig.clone();
    }

    public abstract JobSubmissionResult submitJob(JobGraph var1, ClassLoader var2) throws ProgramInvocationException;

    public void shutDownCluster() {
        throw new UnsupportedOperationException("The " + this.getClass().getSimpleName() + " does not support shutDownCluster.");
    }
}

