Search code examples
apache-sparkapache-kafkaspark-streamingspark-graphx

Why does DStream.foreachRDD fail with java.io.NotSerializableException: org.apache.spark.SparkContext?


I need to build a graph with GraphX based on the processed data from Kafka. However, it seems that sc.parallelize() raises the error java.io.NotSerializableException: org.apache.spark.SparkContext

......
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)

lines.foreachRDD(rdd => {
  rdd.foreachPartition(partition => {
    ......
    // Build a graph
    val vertRDD = sc.parallelize(vertices)
    val edgeRDD = sc.parallelize(edge)
    val graph = Graph(vertRDD, edgeRDD, defaultUser)
    }
  })
})

In what way should I solve the problem?


Solution

  • foreachRDD operator in Spark Streaming runs processing RDDs every batch interval on the driver that you then use (through its RDD) to write a code that eventually turns itself into Spark jobs.

    foreachRDD(foreachFunc: (RDD[T]) ⇒ Unit): Unit Apply a function to each RDD in this DStream. This is an output operator, so 'this' DStream will be registered as an output stream and therefore materialized.

    RDD.foreachPartition is an action that will only happen on executors.

    foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function f to each partition of this RDD.

    Before a task is available for execution on executors it has to be serialized and because SparkContext is not serializable, hence the exception. That's how Spark makes sure that SparkContext (as sc) can never show up due to design decisions in Spark. That would not make sense anyway as the entire scheduling infrastructure is on the driver.

    SparkContext and RDD are only available on the driver. They are simply a way to describe your distributed computations that will eventually get "translated" to tasks that run on Spark executors.

    That's why you see the error message:

    java.io.NotSerializableException: org.apache.spark.SparkContext

    BTW, I answered a similar question today (see Can SparkContext.textFile be used with a custom receiver?) so it looks like today is a SparkContext day :)