/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.OutputStream;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkJobInvoker;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkJobServerDriver
extends JobServerDriver {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class);

    public static void main(String[] args) throws Exception {
        FileSystems.setDefaultPipelineOptions((PipelineOptions)PipelineOptionsFactory.create());
        FlinkJobServerDriver.fromParams(args).run();
    }

    private static void printUsage(CmdLineParser parser) {
        System.err.println(String.format("Usage: java %s arguments...", FlinkJobServerDriver.class.getSimpleName()));
        parser.printUsage((OutputStream)System.err);
        System.err.println();
    }

    public static FlinkJobServerDriver fromParams(String[] args) {
        FlinkServerConfiguration configuration = new FlinkServerConfiguration();
        CmdLineParser parser = new CmdLineParser((Object)configuration);
        try {
            parser.parseArgument(args);
        }
        catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments.", (Throwable)e);
            FlinkJobServerDriver.printUsage(parser);
            throw new IllegalArgumentException("Unable to parse command line arguments.", e);
        }
        return FlinkJobServerDriver.fromConfig(configuration);
    }

    public static FlinkJobServerDriver fromConfig(FlinkServerConfiguration configuration) {
        return FlinkJobServerDriver.create(configuration, FlinkJobServerDriver.createJobServerFactory((JobServerDriver.ServerConfiguration)configuration), FlinkJobServerDriver.createArtifactServerFactory((JobServerDriver.ServerConfiguration)configuration));
    }

    public static FlinkJobServerDriver create(FlinkServerConfiguration configuration, ServerFactory jobServerFactory, ServerFactory artifactServerFactory) {
        return new FlinkJobServerDriver(configuration, jobServerFactory, artifactServerFactory);
    }

    private FlinkJobServerDriver(FlinkServerConfiguration configuration, ServerFactory jobServerFactory, ServerFactory artifactServerFactory) {
        super((JobServerDriver.ServerConfiguration)configuration, jobServerFactory, artifactServerFactory);
    }

    protected JobInvoker createJobInvoker() {
        return FlinkJobInvoker.create((FlinkServerConfiguration)this.configuration);
    }

    public static class FlinkServerConfiguration
    extends JobServerDriver.ServerConfiguration {
        @Option(name="--flink-master-url", usage="Flink master url to submit job.")
        private String flinkMasterUrl = "[auto]";
        @Option(name="--flink-conf-dir", usage="Directory containing Flink YAML configuration files. These properties will be set to all jobs submitted to Flink and take precedence over configurations in FLINK_CONF_DIR.")
        private String flinkConfDir = null;

        String getFlinkMasterUrl() {
            return this.flinkMasterUrl;
        }

        @Nullable
        String getFlinkConfDir() {
            return this.flinkConfDir;
        }
    }
}

