Search code examples
apache-sparkhdfshadoop-yarn

Running Spark jobs on a YARN cluster with additional files


I'm writing a simple spark application that uses some input RDD, sends it to an external script via pipe, and writes an output of that script to a file. Driver code looks like this:

val input = args(0)
val scriptPath = args(1)
val output = args(2)
val sc = getSparkContext
if (args.length == 4) {
  //Here I pass an additional argument which contains an absolute path to a script on my local machine, only for local testing
  sc.addFile(args(3))
}

sc.textFile(input).pipe(Seq("python2", SparkFiles.get(scriptPath))).saveAsTextFile(output)

When I run it on my local machine it works fine. But when I submit it to a YARN cluster via

spark-submit --master yarn --deploy-mode cluster --files /absolute/path/to/local/test.py --class somepackage.PythonLauncher path/to/driver.jar path/to/input/part-* test.py path/to/output` 

it fails with an exception.

Lost task 1.0 in stage 0.0 (TID 1, rwds2.1dmp.ru): java.lang.Exception: Subprocess exited with status 2

I've tried different variations of the pipe command. For instance, .pipe("cat") works fine, and behaves as expected, but .pipe(Seq("cat", scriptPath)) also fails with error code 1, so it seems that spark can't figure out a path to the script on a cluster node.

Any suggestions?


Solution

  • I don't use python myself but I find some clues may be useful for you (in the source code of Spark-1.3 SparkSubmitArguments)

    • --py-files PY_FILES, Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.

    • --files FILES, Comma-separated list of files to be placed in the working directory of each executor.

    • --archives ARCHIVES, Comma separated list of archives to be extracted into the working directory of each executor.

    And also, your arguments to spark-submit should follow this style:

    Usage: spark-submit [options] <app jar | python file> [app arguments]