/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;

public class HiveSparkClientFactory {
    protected static final transient Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
    private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
    private static final String SPARK_DEFAULT_MASTER = "yarn-cluster";
    private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
    private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
    private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";

    public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws IOException, SparkException {
        Map<String, String> sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveconf);
        String master = sparkConf.get("spark.master");
        if (master.equals("local") || master.startsWith("local[")) {
            return LocalHiveSparkClient.getInstance(HiveSparkClientFactory.generateSparkConf(sparkConf));
        }
        return new RemoteHiveSparkClient(hiveconf, sparkConf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Map<String, String> initiateSparkConf(HiveConf hiveConf) {
        HashMap<String, String> sparkConf = new HashMap<String, String>();
        sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
        sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
        sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
        sparkConf.put("spark.kryo.referenceTracking", SPARK_DEFAULT_REFERENCE_TRACKING);
        InputStream inputStream = null;
        try {
            inputStream = HiveSparkClientFactory.class.getClassLoader().getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
            if (inputStream != null) {
                LOG.info((Object)"loading spark properties from:spark-defaults.conf");
                Properties properties = new Properties();
                properties.load(new InputStreamReader(inputStream, "UTF-8"));
                for (String propertyName : properties.stringPropertyNames()) {
                    if (!propertyName.startsWith("spark")) continue;
                    String value = properties.getProperty(propertyName);
                    sparkConf.put(propertyName, properties.getProperty(propertyName));
                    LOG.info((Object)String.format("load spark property from %s (%s -> %s).", SPARK_DEFAULT_CONF_FILE, propertyName, value));
                }
            }
        }
        catch (IOException e) {
            LOG.info((Object)"Failed to open spark configuration file:spark-defaults.conf", (Throwable)e);
        }
        finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                }
                catch (IOException e) {
                    LOG.debug((Object)"Failed to close inputstream.", (Throwable)e);
                }
            }
        }
        String sparkMaster = hiveConf.get("spark.master");
        if (sparkMaster == null) {
            sparkMaster = (String)sparkConf.get("spark.master");
        }
        if (sparkMaster.equals(SPARK_DEFAULT_MASTER)) {
            sparkConf.put("spark.yarn.maxAppAttempts", "1");
        }
        Iterator<String> iterator = hiveConf.iterator();
        while (iterator.hasNext()) {
            String value;
            Map.Entry entry = (Map.Entry)((Object)iterator.next());
            String propertyName = (String)entry.getKey();
            if (propertyName.startsWith("spark")) {
                value = hiveConf.get(propertyName);
                sparkConf.put(propertyName, value);
                LOG.info((Object)String.format("load spark property from hive configuration (%s -> %s).", propertyName, value));
            } else if (propertyName.startsWith("yarn") && (sparkMaster.equals("yarn-client") || sparkMaster.equals(SPARK_DEFAULT_MASTER))) {
                value = hiveConf.get(propertyName);
                sparkConf.put("spark.hadoop." + propertyName, value);
                LOG.info((Object)String.format("load yarn property from hive configuration in %s mode (%s -> %s).", sparkMaster, propertyName, value));
            }
            if (!RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains((Object)propertyName)) continue;
            value = RpcConfiguration.getValue((HiveConf)hiveConf, (String)propertyName);
            sparkConf.put(propertyName, value);
            LOG.info((Object)String.format("load RPC property from hive configuration (%s -> %s).", propertyName, value));
        }
        HashSet classes = Sets.newHashSet((Iterable)Splitter.on((String)",").trimResults().omitEmptyStrings().split((CharSequence)Strings.nullToEmpty((String)((String)sparkConf.get("spark.kryo.classesToRegister")))));
        classes.add(VectorizedRowBatch.class.getName());
        classes.add(BytesWritable.class.getName());
        classes.add(HiveKey.class.getName());
        sparkConf.put("spark.kryo.classesToRegister", Joiner.on((String)",").join((Iterable)classes));
        return sparkConf;
    }

    static SparkConf generateSparkConf(Map<String, String> conf) {
        SparkConf sparkConf = new SparkConf(false);
        for (Map.Entry<String, String> entry : conf.entrySet()) {
            sparkConf.set(entry.getKey(), entry.getValue());
        }
        return sparkConf;
    }
}

