Search code examples
scalaapache-sparkspark-graphx

GraphX VertexRDD NullPointerException


I am trying to do some message passing on a graph to calculate recursive features. I get an error when I define a graph whose vertices are the output of aggregateMessages. Code for context

> val newGraph = Graph(newVertices, edges)

newGraph: org.apache.spark.graphx.Graph[List[Double],Int] = org.apache.spark.graphx.impl.GraphImpl@2091594b

//This is the RDD that causes the problem
> val result = newGraph.aggregateMessages[List[Double]](
  {triplet => triplet.sendToDst(triplet.srcAttr)},
  {(a,b) => a.zip(b).map { case (x, y) => x + y }},
  {TripletFields.Src}) 

result: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[1990] at RDD at VertexRDD.scala:57

> result.take(1) 
res121: Array[(org.apache.spark.graphx.VertexId, List[Double])] = Array((1944425548,List(0.0, 0.0, 137.0, 292793.0)))

So far no problem, but when I try

> val newGraph2 = Graph(result, edges)

newGraph2: org.apache.spark.graphx.Graph[List[Double],Int] = org.apache.spark.graphx.impl.GraphImpl@710919e1

> val result2 = newGraph2.aggregateMessages[List[Double]](
  {triplet => triplet.sendToDst(triplet.srcAttr)},
  {(a,b) => a.zip(b).map { case (x, y) => x + y }},
  {TripletFields.Src})

> result2.count

I get the following (trimmed) error:

result2: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[2009] at RDD at VertexRDD.scala:57
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4839.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4839.0 (TID 735, 10.0.2.15): java.lang.NullPointerException
    at $anonfun$2.apply(<console>:62)
    at $anonfun$2.apply(<console>:62)
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536)
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531)
    at $anonfun$1.apply(<console>:61)
    at $anonfun$1.apply(<console>:61)
    at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409)
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237)
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
...
Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
...
Caused by: java.lang.NullPointerException
  at $anonfun$2.apply(<console>:62)
  at $anonfun$2.apply(<console>:62)
  at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536)
  at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531)
  at $anonfun$1.apply(<console>:61)
  at $anonfun$1.apply(<console>:61)
  at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409)
  at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237)
  at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  at org.apache.spark.scheduler.Task.run(Task.scala:85)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  ... 3 more

I don't think this is a type mismatch error because aggregateMessages returns a VertexRDD, any ideas why I am getting this problem?


Solution

  • Not all the nodes in the graph are returned by aggregateMessages, only the ones that receive a message. The NullPointerException is caused by edges in the graph pointing at those nodes plus the absence of a default node value in the graph definition.