Search code examples
scalaapache-sparknullpointerexceptionspark-graphx

SparkException: Job aborted due to stage failure: NullPointerException when working with Spark-Graphx


I'm new in scala and I'm looking for solving this error.


The scenario I'm working on is this. I've 3 tables:

  • user: containing ID and name

  • business: containing ID and name

  • reviews: containing user.ID and business.ID

Only users make a review and only business receive a review. The graph will be something like this:

Graph example

What I'm looking for is:

For each user I want to know the other users that made a review to the same business

I did this actions to create the graph:

val users = sqlContext.sql("Select user_id as ID from user")
val business= sqlContext.sql("Select business_id as ID from business")
users.write.mode(SaveMode.Append).saveAsTable("user_busin_db")
business.write.mode(SaveMode.Append).saveAsTable("user_busin_db")
val user_bus = sqlContext.sql("Select ID from user_busin_db")
val reviews = sqlContext.sql("Select user_id, business_id from review")

The table user_bus will be used for vertexs creation.

After that I created the graph with GraphX with this code:

def str2Long(s: String) = s.##.toLong

val vertex: RDD[(VertexId, String)] = user_bus.rdd.map(x => (str2Long(x(0).asInstanceOf[String]),(x(0).asInstanceOf[String])))
val edge:RDD[Edge[String]] = reviews.rdd.map(row => Edge(str2Long(row(0).asInstanceOf[String]), str2Long(row(1).asInstanceOf[String]), "review"))

val default = "missing"
val myGraph = Graph(vertex, edge, default)

myGraph.cache()

Now to answer my question I tried to do a aggregateMessages for eaither users and business with this code:

val userAggregate: VertexRDD[(List[Long])] = myGraph.aggregateMessages[(List[Long])](triplet => {
      triplet.sendToSrc((List(triplet.dstId)))
  },
  (a,b) => (a.union(b))
)

val businessAggregate: VertexRDD[(List[Long])] = myGraph.aggregateMessages[(List[Long])](triplet => {
      triplet.sendToDst((List(triplet.srcId)))
  },
  (a,b) => (a.union(b))
) 

And then the code that gives me the error. To collect for each user what are the other users that made a reviews at same business I wrote this:

userAggregate.map(userAggr =>
    (userAggr._1, userAggr._2.flatMap(userAggrListElem =>
        userAggr._2.patch(0,businessAggregate.filter(busAggr => busAggr._1 == userAggrListElem).map(row => row._2).take(1)(0),userAggr._2.size+1))))

If I try to use .collect or .count on it i got this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 138.0 failed 1 times, most recent failure: Lost task 1.0 in stage 138.0 (TID 2807, localhost): java.lang.NullPointerException
at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:94)
    at org.apache.spark.graphx.VertexRDD.filter(VertexRDD.scala:98)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:102)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:101)
    at scala.collection.immutable.List.flatMap(List.scala:327)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:101)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:100)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1769)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1134)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:105)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:115)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:117)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:119)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:121)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:123)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:125)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:127)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:129)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:131)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:135)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:137)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:139)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:141)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:143)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:145)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:147)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw.<init>(<console>:149)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw.<init>(<console>:151)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw.<init>(<console>:153)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw.<init>(<console>:155)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$eval$.$print$lzycompute(<console>:7)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$eval$.$print(<console>:6)
Caused by: java.lang.NullPointerException
    at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:94)
    at org.apache.spark.graphx.VertexRDD.filter(VertexRDD.scala:98)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:102)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:101)
    at scala.collection.immutable.List.flatMap(List.scala:327)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:101)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:100)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1769)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

The algorithm works well if I use a subset of userAggregate, indeed if I use take(1) I get this result:

Array[(org.apache.spark.graphx.VertexId, List[Long])] = Array((-1324024017,List(-1851582020, -1799460264, -1614007919, -1573604682, ...)))

Which is: (user_ID, List(user_id that made a review to the same business,...)

Now I think there is a problem with the Vertexs, there is somewhere an unconnected vertex that gives me NullPointer error, but I'm not able to find it and delete from my grapf. What can I do for solving this problem?


Solution

  • TL;DR It is not a valid Spark code.

    This is an expected outcome. It is not allowed to nest transformations in Apache Spark, hence you cannot access businessAggregate inside the closure of userAggregate.map.