Search code examples
hadoopapache-sparkhadoop-yarnkerberoscloudera-cdh

Running from a local IDE against a remote Spark cluster


We have a kerberized cluster with Spark running on Yarn. At the moment, we write our Spark code in Scala locally, then build a fat JAR which we copy over to the cluster and then run spark-submit. I would instead like to write Spark code on my local PC and have it run against the cluster directly. Is there a straightforward way to do this? The Spark docs don't seem to have any such pattern.

FYI, my local machine is running Windows and the cluster is running CDH.


Solution

  • While cricket007's answer works for spark-submit, here is what I did to run against a remote cluster using IntelliJ:

    First, make sure the JARs on the client and server sides are identical. Since we are using CDH 7.1, I made sure all my JARs came from the specific distribution.

    Set HADOOP_CONF_DIR and YARN_CONF_DIR as described in cricket007's answer. Set "spark.yarn.principal" and "spark.yarn.keytab" as appropriate in the Spark conf.

    If connecting to HDFS, make sure the following exclusion rule is set in build.sbt:

    libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0-cdh5.7.1" excludeAll ExclusionRule(organization = "javax.servlet")
    

    Make sure the spark-launcher and spark-yarn JARs are listed on build.sbt.

    libraryDependencies += "org.apache.spark" %% "spark-launcher" % "1.6.0-cdh5.7.1"
    
    libraryDependencies += "org.apache.spark" %% "spark-yarn" % "1.6.0-cdh5.7.1"
    

    Find the CDH JARs on the server and copy them to a known location on HDFS. Add the following lines to your code:

    final val CDH_JAR_PATH = "/opt/cloudera/parcels/CDH/jars"
    
    final val hadoopJars: Seq[String] = Seq[String](
    "hadoop-annotations-2.6.0-cdh5.7.1.jar"
    , "hadoop-ant-2.6.0-cdh5.7.1.jar"
    , "hadoop-ant-2.6.0-mr1-cdh5.7.1.jar"
    , "hadoop-archive-logs-2.6.0-cdh5.7.1.jar"
    , "hadoop-archives-2.6.0-cdh5.7.1.jar"
    , "hadoop-auth-2.6.0-cdh5.7.1.jar"
    , "hadoop-aws-2.6.0-cdh5.7.1.jar"
    , "hadoop-azure-2.6.0-cdh5.7.1.jar"
    , "hadoop-capacity-scheduler-2.6.0-mr1-cdh5.7.1.jar"
    , "hadoop-common-2.6.0-cdh5.7.1.jar"
    , "hadoop-core-2.6.0-mr1-cdh5.7.1.jar"
    , "hadoop-datajoin-2.6.0-cdh5.7.1.jar"
    , "hadoop-distcp-2.6.0-cdh5.7.1.jar"
    , "hadoop-examples-2.6.0-mr1-cdh5.7.1.jar"
    , "hadoop-examples.jar"
    , "hadoop-extras-2.6.0-cdh5.7.1.jar"
    , "hadoop-fairscheduler-2.6.0-mr1-cdh5.7.1.jar"
    , "hadoop-gridmix-2.6.0-cdh5.7.1.jar"
    , "hadoop-gridmix-2.6.0-mr1-cdh5.7.1.jar"
    , "hadoop-hdfs-2.6.0-cdh5.7.1.jar"
    , "hadoop-hdfs-nfs-2.6.0-cdh5.7.1.jar"
    , "hadoop-kms-2.6.0-cdh5.7.1.jar"
    , "hadoop-mapreduce-client-app-2.6.0-cdh5.7.1.jar"
    , "hadoop-mapreduce-client-common-2.6.0-cdh5.7.1.jar"
    , "hadoop-mapreduce-client-core-2.6.0-cdh5.7.1.jar"
    , "hadoop-mapreduce-client-hs-2.6.0-cdh5.7.1.jar"
    , "hadoop-mapreduce-client-hs-plugins-2.6.0-cdh5.7.1.jar"
    , "hadoop-mapreduce-client-jobclient-2.6.0-cdh5.7.1.jar"
    , "hadoop-mapreduce-client-nativetask-2.6.0-cdh5.7.1.jar"
    , "hadoop-mapreduce-client-shuffle-2.6.0-cdh5.7.1.jar"
    , "hadoop-nfs-2.6.0-cdh5.7.1.jar"
    , "hadoop-openstack-2.6.0-cdh5.7.1.jar"
    , "hadoop-rumen-2.6.0-cdh5.7.1.jar"
    , "hadoop-sls-2.6.0-cdh5.7.1.jar"
    , "hadoop-streaming-2.6.0-cdh5.7.1.jar"
    , "hadoop-streaming-2.6.0-mr1-cdh5.7.1.jar"
    , "hadoop-tools-2.6.0-mr1-cdh5.7.1.jar"
    , "hadoop-yarn-api-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-applications-distributedshell-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-client-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-common-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-registry-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-server-common-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-server-nodemanager-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-server-resourcemanager-2.6.0-cdh5.7.1.jar"
    , "hadoop-yarn-server-web-proxy-2.6.0-cdh5.7.1.jar"
    , "hbase-hadoop2-compat-1.2.0-cdh5.7.1.jar"
    , "hbase-hadoop-compat-1.2.0-cdh5.7.1.jar")
    
    final val sparkJars: Seq[String] = Seq[String](
    "spark-1.6.0-cdh5.7.1-yarn-shuffle.jar",
    "spark-assembly-1.6.0-cdh5.7.1-hadoop2.6.0-cdh5.7.1.jar",
    "spark-avro_2.10-1.1.0-cdh5.7.1.jar",
    "spark-bagel_2.10-1.6.0-cdh5.7.1.jar",
    "spark-catalyst_2.10-1.6.0-cdh5.7.1.jar",
    "spark-core_2.10-1.6.0-cdh5.7.1.jar",
    "spark-examples-1.6.0-cdh5.7.1-hadoop2.6.0-cdh5.7.1.jar",
    "spark-graphx_2.10-1.6.0-cdh5.7.1.jar",
    "spark-hive_2.10-1.6.0-cdh5.7.1.jar",
    "spark-launcher_2.10-1.6.0-cdh5.7.1.jar",
    "spark-mllib_2.10-1.6.0-cdh5.7.1.jar",
    "spark-network-common_2.10-1.6.0-cdh5.7.1.jar",
    "spark-network-shuffle_2.10-1.6.0-cdh5.7.1.jar",
    "spark-repl_2.10-1.6.0-cdh5.7.1.jar",
    "spark-sql_2.10-1.6.0-cdh5.7.1.jar",
    "spark-streaming-flume-sink_2.10-1.6.0-cdh5.7.1.jar",
    "spark-streaming-flume_2.10-1.6.0-cdh5.7.1.jar",
    "spark-streaming-kafka_2.10-1.6.0-cdh5.7.1.jar",
    "spark-streaming_2.10-1.6.0-cdh5.7.1.jar",
    "spark-unsafe_2.10-1.6.0-cdh5.7.1.jar",
    "spark-yarn_2.10-1.6.0-cdh5.7.1.jar")
    
    def getClassPath(jarNames: Seq[String], pathPrefix: String): String = {
    jarNames.foldLeft("")((cp, name) => s"$cp:$pathPrefix/$name").drop(1)
    

    }

    Add these lines when creating a SparkConf:

    .set("spark.driver.extraClassPath", getClassPath(sparkJars ++ hadoopJars, CDH_JAR_PATH))
    .set("spark.executor.extraClassPath", getClassPath(sparkJars ++ hadoopJars, CDH_JAR_PATH))
    .set("spark.yarn.jars", "hdfs://$YOUR_MACHINE/PATH_TO_JARS/*")
    

    Your program should work now.