/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.net.URL;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkStateBackendFactory;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkExecutionEnvironments {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class);

    public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options, List<String> filesToStage) {
        return FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, filesToStage, null);
    }

    static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options, List<String> filesToStage, @Nullable String confDir) {
        LocalEnvironment flinkBatchEnv;
        LOG.info("Creating a Batch Execution Environment.");
        String masterUrl = options.getFlinkMaster();
        Configuration flinkConfiguration = FlinkExecutionEnvironments.getFlinkConfiguration(confDir);
        if ("[local]".equals(masterUrl)) {
            flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment((Configuration)flinkConfiguration);
        } else if ("[collection]".equals(masterUrl)) {
            flinkBatchEnv = new CollectionEnvironment();
        } else if ("[auto]".equals(masterUrl)) {
            flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
        } else {
            int defaultPort = flinkConfiguration.getInteger(RestOptions.PORT);
            HostAndPort hostAndPort = HostAndPort.fromString((String)masterUrl).withDefaultPort(defaultPort);
            flinkConfiguration.setInteger(RestOptions.PORT, hostAndPort.getPort());
            flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment((String)hostAndPort.getHost(), (int)hostAndPort.getPort(), (Configuration)flinkConfiguration, (String[])filesToStage.toArray(new String[filesToStage.size()]));
            LOG.info("Using Flink Master URL {}:{}.", (Object)hostAndPort.getHost(), (Object)hostAndPort.getPort());
        }
        flinkBatchEnv.getConfig().setExecutionMode(ExecutionMode.valueOf((String)options.getExecutionModeForBatch()));
        if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) {
            flinkBatchEnv.setParallelism(options.getParallelism().intValue());
        }
        int parallelism = flinkBatchEnv instanceof CollectionEnvironment ? 1 : FlinkExecutionEnvironments.determineParallelism(options.getParallelism(), flinkBatchEnv.getParallelism(), flinkConfiguration);
        flinkBatchEnv.setParallelism(parallelism);
        options.setParallelism(parallelism);
        if (options.getObjectReuse().booleanValue()) {
            flinkBatchEnv.getConfig().enableObjectReuse();
        } else {
            flinkBatchEnv.getConfig().disableObjectReuse();
        }
        FlinkExecutionEnvironments.applyLatencyTrackingInterval(flinkBatchEnv.getConfig(), options);
        return flinkBatchEnv;
    }

    public static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions options, List<String> filesToStage) {
        return FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, filesToStage, null);
    }

    @VisibleForTesting
    static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions options, List<String> filesToStage, @Nullable String confDir) {
        long checkpointInterval;
        long retryDelay;
        Object flinkStreamEnv;
        LOG.info("Creating a Streaming Environment.");
        String masterUrl = options.getFlinkMaster();
        Configuration flinkConfiguration = FlinkExecutionEnvironments.getFlinkConfiguration(confDir);
        if ("[local]".equals(masterUrl)) {
            flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment((int)StreamExecutionEnvironment.getDefaultLocalParallelism(), (Configuration)flinkConfiguration);
        } else if ("[auto]".equals(masterUrl)) {
            flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        } else {
            int defaultPort = flinkConfiguration.getInteger(RestOptions.PORT);
            HostAndPort hostAndPort = HostAndPort.fromString((String)masterUrl).withDefaultPort(defaultPort);
            flinkConfiguration.setInteger(RestOptions.PORT, hostAndPort.getPort());
            SavepointRestoreSettings savepointRestoreSettings = options.getSavepointPath() != null ? SavepointRestoreSettings.forPath((String)options.getSavepointPath(), (boolean)options.getAllowNonRestoredState()) : SavepointRestoreSettings.none();
            flinkStreamEnv = new BeamFlinkRemoteStreamEnvironment(hostAndPort.getHost(), hostAndPort.getPort(), flinkConfiguration, savepointRestoreSettings, filesToStage.toArray(new String[filesToStage.size()]));
            LOG.info("Using Flink Master URL {}:{}.", (Object)hostAndPort.getHost(), (Object)hostAndPort.getPort());
        }
        int parallelism = FlinkExecutionEnvironments.determineParallelism(options.getParallelism(), flinkStreamEnv.getParallelism(), flinkConfiguration);
        flinkStreamEnv.setParallelism(parallelism);
        if (options.getMaxParallelism() > 0) {
            flinkStreamEnv.setMaxParallelism(options.getMaxParallelism().intValue());
        }
        options.setParallelism(parallelism);
        if (options.getObjectReuse().booleanValue()) {
            flinkStreamEnv.getConfig().enableObjectReuse();
        } else {
            flinkStreamEnv.getConfig().disableObjectReuse();
        }
        flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        int numRetries = options.getNumberOfExecutionRetries();
        if (numRetries != -1) {
            flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
        }
        if ((retryDelay = options.getExecutionRetryDelay().longValue()) != -1L) {
            flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
        }
        if ((checkpointInterval = options.getCheckpointingInterval().longValue()) != -1L) {
            long minPauseBetweenCheckpoints;
            if (checkpointInterval < 1L) {
                throw new IllegalArgumentException("The checkpoint interval must be positive");
            }
            flinkStreamEnv.enableCheckpointing(checkpointInterval, CheckpointingMode.valueOf((String)options.getCheckpointingMode()));
            if (options.getCheckpointTimeoutMillis() != -1L) {
                flinkStreamEnv.getCheckpointConfig().setCheckpointTimeout(options.getCheckpointTimeoutMillis().longValue());
            }
            boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled();
            boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation();
            if (externalizedCheckpoint) {
                flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(retainOnCancellation ? CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION : CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
            }
            if ((minPauseBetweenCheckpoints = options.getMinPauseBetweenCheckpoints().longValue()) != -1L) {
                flinkStreamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);
            }
            boolean failOnCheckpointingErrors = options.getFailOnCheckpointingErrors();
            flinkStreamEnv.getCheckpointConfig().setFailOnCheckpointingErrors(failOnCheckpointingErrors);
        }
        FlinkExecutionEnvironments.applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
        if (options.getAutoWatermarkInterval() != null) {
            flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval().longValue());
        }
        if (options.getStateBackendFactory() != null) {
            StateBackend stateBackend = ((FlinkStateBackendFactory)InstanceBuilder.ofType(FlinkStateBackendFactory.class).fromClass(options.getStateBackendFactory()).build()).createStateBackend(options);
            flinkStreamEnv.setStateBackend(stateBackend);
        }
        return flinkStreamEnv;
    }

    private static int determineParallelism(int pipelineOptionsParallelism, int envParallelism, Configuration configuration) {
        if (pipelineOptionsParallelism > 0) {
            return pipelineOptionsParallelism;
        }
        if (envParallelism > 0) {
            return envParallelism;
        }
        int flinkConfigParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM.key(), -1);
        if (flinkConfigParallelism > 0) {
            return flinkConfigParallelism;
        }
        LOG.warn("No default parallelism could be found. Defaulting to parallelism 1. Please set an explicit parallelism with --parallelism");
        return 1;
    }

    private static Configuration getFlinkConfiguration(@Nullable String flinkConfDir) {
        return flinkConfDir == null ? GlobalConfiguration.loadConfiguration() : GlobalConfiguration.loadConfiguration((String)flinkConfDir);
    }

    private static void applyLatencyTrackingInterval(ExecutionConfig config, FlinkPipelineOptions options) {
        long latencyTrackingInterval = options.getLatencyTrackingInterval();
        config.setLatencyTrackingInterval(latencyTrackingInterval);
    }

    private static class BeamFlinkRemoteStreamEnvironment
    extends RemoteStreamEnvironment {
        private final SavepointRestoreSettings restoreSettings;

        public BeamFlinkRemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, SavepointRestoreSettings restoreSettings, String ... jarFiles) {
            super(host, port, clientConfiguration, jarFiles, null);
            this.restoreSettings = restoreSettings;
        }

        protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
            RestClusterClient client;
            List globalClasspaths = Collections.emptyList();
            String host = super.getHost();
            int port = super.getPort();
            if (LOG.isInfoEnabled()) {
                LOG.info("Running remotely at {}:{}", (Object)host, (Object)port);
            }
            ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader());
            Configuration configuration = new Configuration();
            configuration.addAll(super.getClientConfiguration());
            configuration.setString(JobManagerOptions.ADDRESS, host);
            configuration.setInteger(JobManagerOptions.PORT, port);
            configuration.setInteger(RestOptions.PORT, port);
            try {
                client = new RestClusterClient(configuration, (Object)"RemoteStreamEnvironment");
            }
            catch (Exception e) {
                throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), (Throwable)e);
            }
            client.setPrintStatusDuringExecution(this.getConfig().isSysoutLoggingEnabled());
            try {
                JobExecutionResult e = client.run((FlinkPlan)streamGraph, jarFiles, globalClasspaths, usercodeClassLoader, this.restoreSettings).getJobExecutionResult();
                return e;
            }
            catch (ProgramInvocationException e) {
                throw e;
            }
            catch (Exception e) {
                String term = e.getMessage() == null ? "." : ": " + e.getMessage();
                throw new ProgramInvocationException("The program execution failed" + term, (Throwable)e);
            }
            finally {
                try {
                    client.shutdown();
                }
                catch (Exception e) {
                    LOG.warn("Could not properly shut down the cluster client.", (Throwable)e);
                }
            }
        }
    }
}

