Search code examples
apache-sparkgoogle-cloud-dataproc

Spark streaming on dataproc throws FileNotFoundException


When I try to submit a spark streaming job to google dataproc cluster, I get this exception:

16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
...
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@d7bffbc{HTTP/1.1}{0.0.0.0:4040}
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
        at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
        at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
...
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)

Full output here.

It seems this error happens when hadoop configuration is not correctly defined in spark-env.sh - link1, link2

Is it configurable somewhere? Any pointers on how to resolve it?

Running the same code in local mode works fine:

sparkConf.setMaster("local[4]")

For additional context: the job was invoked like this:

gcloud dataproc jobs submit spark \
--cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false

This is the boilerplate setup code:

  lazy val conf = {
    val c = new SparkConf().setAppName(this.getClass.getName)
    c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString)

    if (isLocal) c.setMaster("local[4]")
    c.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    c.set("spark.streaming.blockInterval", "1s")
  }

  lazy val ssc = if (checkPointingEnabled) {
    StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext)
  } else {
    createStreamingContext()
  }

  private def getCheckPointDirectory: String = {
    if (isLocal) localCheckPointPath else checkPointPath
  }

  private def createStreamingContext(): StreamingContext = {
    val s = new StreamingContext(conf, Seconds(batchDurationSeconds))
    s.checkpoint(getCheckPointDirectory)
    s
  }

Thanks in advance


Solution

  • Is it possible that this wasn't the first time you ran the job with the given checkpoint directory, as in the checkpoint directory already contains a checkpoint?

    This happens because the checkpoint hard-codes the exact jarfile arguments used to submit the YARN application, and when running on Dataproc with a --jars flag pointing to GCS, this is actually syntactic sugar for Dataproc automatically staging your jarfile from GCS into a local file path /tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar that's only used temporarily for the duration of a single job-run, since Spark isn't able to invoke the jarfile directly out of GCS without staging it locally.

    However, in a subsequent job, the previous tmp jarfile will already be deleted, but the new job tries to refer to that old location hard-coded into the checkpoint data.

    There are also additional issues caused by hard-coding in the checkpoint data; for example, Dataproc also uses YARN "tags" to track jobs, and will conflict with YARN if an old Dataproc job's "tag" is reused in a new YARN application. To run your streaming application, you'll need to first clear out your checkpoint directory if possible to start from a clean slate, and then:

    1. You must place the job jarfile somewhere on the master node before starting the job, and then your "--jar" flag must specify "file:///path/on/master/node/to/jarfile.jar".

    When you specify a "file:///" path dataproc knows its already on the master node so it doesn't re-stage into a /tmp directory, so in that case it's safe for the checkpoint to point to some fixed local directory on the master.

    You can do this either with an init action or you can submit a quick pig job (or just ssh into the master and download that jarfile):

    # Use a quick pig job to download the jarfile to a local directory (for example /usr/lib/spark in this case)
    gcloud dataproc jobs submit pig --cluster my-test-cluster \
        --execute "fs -cp gs://my-bucket/resources/skyfall-assembly-0.0.1.jar file:///usr/lib/spark/skyfall-assembly-0.0.1.jar"
    
    # Submit the first attempt of the job
    gcloud dataproc jobs submit spark --cluster my-test-cluster \
        --class com.company.skyfall.Skyfall \
        --jars file:///usr/lib/spark/skyfall-assembly-0.0.1.jar \
        --properties spark.ui.showConsoleProgress=false
    
    1. Dataproc relies on spark.yarn.tags under the hood to track YARN applications associated with jobs. However, the checkpoint holds a stale spark.yarn.tags which causes Dataproc to get confused with new applications that seem to be associated with old jobs.

    For now, it only "cleans up" suspicious YARN applications as long as the recent killed jobid is held in memory, so rebooting the dataproc agent will fix this.

    # Kill the job through the UI or something before the next step.
    # Now use "pig sh" to restart the dataproc agent
    gcloud dataproc jobs submit pig --cluster my-test-cluster \
        --execute "sh systemctl restart google-dataproc-agent.service"
    
    # Re-run your job without needing to change anything else,
    # it'll be fine now if you ever need to resubmit it and it
    # needs to recover from the checkpoint again.
    

    Keep in mind though that by nature of checkpoints this means you won't be able to change the arguments you pass on subsequent runs, because the checkpoint recovery is used to clobber your command-line settings.