Search code examples
scalaapache-sparkrddspark-graphxmap-function

New to Spark, mapping with graphx graphs - NullPointerException


My goal is to count triangles in multiple subgraphs from a common full graph. The subgraph is defined by a constant set of nodes + a node from an RDD[Long]. I'm new to spark/graphx, so this may be an improper use of map. The code I'm sharing will reproduce my error.

To start, I have a subgraph of a full graph declared as shown below

import org.apache.spark.rdd._
import org.apache.spark.graphx._
val nodes: RDD[(VertexId, String)] = sc.parallelize(Array((3L, "3"), (7L, "7"), (5L, "5"), (2L, "2"),(4L,"4")))
val vertices: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "a"), Edge(3L, 5L, "b"), Edge(2L, 5L, "c"), Edge(5L, 7L, "d"), Edge(2L, 7L, "e"),Edge(4L,5L,"f")))
val graph: Graph[String,String] = Graph(nodes, vertices, "z")

val baseNodes: Array[Long] = Array(2L,5L,7L)    
val subgraph = graph.subgraph(vpred = (vid,attr)=> baseNodes contains vid)

Then I declare an RDD[Long] of other nodes from the graph.

val testNodes: RDD[Long] = sc.parallelize(Array(3L,4L))

I want to add each testNode to the subgraph and count the triangles present at testNode.

val triangles: RDD[(Long,Int)] = testNodes.map{ newNode =>
  val newNodes: Array[Long] = baseNodes :+ newNode
  val newSubgraph = graph.subgraph(vpred = (vid,attr)=> newNodes contains vid)
  (newNode,findTriangles(7L,newSubgraph))
}
triangles.foreach(x=>x.toString)

My findTriangles works fine if I call it outside of the map function.

def findTriangles(id:Long,subgraph:Graph[String,String]): Int = {
  val triCounts = subgraph.triangleCount().vertices
  val count:Int = triCounts.filter{case(item,count)=> {item.toInt == id}}.map{case(item,count)=>count}.first
  count
}
val triangles = findTriangles(7L,subgraph) //1

But when I run my map function to calculate triangles, I get a NullPointerException. I think the problem is in using my graph val inside the mapping function. Is that the issue? Is there a way to workaround this?


Solution

  • I think that the issue should be the baseNodes variable. Variables that are declared locally, such as baseNodes in your example, are only visible in the Spark driver, not in the executors that actually execute transformations and actions. To avoid the NullPointerException, you need to parallelize any variable that you'll need in the transformations (like map) that are executed on the executors. As an alternative, if the variable you have is read-only, you can broadcast that variable to executors using the broadcast construct in Spark. In your case, it seems that baseNodes doesn't get modified within the map operation, so it's a good candidate to be broadcast instead of parallelized.