Search code examples
javahadoopapache-sparkspark-streaminghadoop-yarn

Cannot set configuration when submitting Spark Streaming jobs to YARN with Client within Java code


I am finding ways to submit my Spark Streaming jobs to YARN within Java code, and finally decided to use org.apache.spark.deploy.yarn.Client to submit. Everything looks fine, but now I find I cannot set configurations of either Spark or Spark Streaming, such as spark.dynamicAllocation.enabled.

I have tried various methods, like setting SparkConf which is used to create JavaSparkContext, setting SparkConf which is used to create ClientArguments and Client, and using System.setProperty(xxx), but none of them work. I intend to set the configuration dynamically, but even if I modify the spark-defaults.conf, nothing changed.

I have also tried other ways to submit jobs, like using SparkSubmit.main(xxx) and Runtime.getRuntime.exec("spark-submit", "xxx"), but beside this problem, they have even more problems, not seeming like recommended methods.

Could anyone tell me a workaround?


Solution

  • You can use SparkLauncher to run you Spark jobs on Yarn cluster from java code. For example I used it to run my spark jobs from my java web application, spark job jar was packaged into a web app jar.

    If you use spark version 1.5 and lower it's going to look like this (see SparkLauncher package):

        Process sparkLauncherProcess = new SparkLauncher()
            .setSparkHome(SPARK_HOME)
            .setJavaHome(JAVA_HOME)
            .setAppResource(SPARK_JOB_JAR_PATH)
            .setMainClass(SPARK_JOB_MAIN_CLASS)
            .addAppArgs("arg1", "arg2")
            .setMaster("yarn-cluster")
            .setConf("spark.dynamicAllocation.enabled", "true")
            .launch();
         sparkLauncherProcess.waitFor();
    

    If you use spark version 1.6 and higher it's going to look like this (see SparkLauncher package SparkAppHandle has some additional functionality):

        SparkAppHandle handle = new SparkLauncher()
            .setSparkHome(SPARK_HOME)
            .setJavaHome(JAVA_HOME)
            .setAppResource(SPARK_JOB_JAR_PATH)
            .setMainClass(SPARK_JOB_MAIN_CLASS)
            .addAppArgs("arg1", "arg2")
            .setMaster("yarn-cluster")
            .setConf("spark.dynamicAllocation.enabled", "true")
            .startApplication();
    

    The only dependency you need is:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-launcher_2.11</artifactId>
            <version>1.5.0</version>
            <scope>provided</scope>
        </dependency>