Search code examples
scalaapache-sparkspark-graphx

Graph constructed using graphx is not getting broadcast-ed properly


I created a graph using graphx and now I need to extract sub-graphs from the original graph. users_graph is an RDD which has a sub-graph indexed to a user. The problem is that these sub-graphs are not getting computed. I get a java.lang.NullPointerException exception when I try to operate on these sub-graphs.

class VertexProperty(val id:Long) extends Serializable
case class User(val userId:Long, var offset:Int, val userCode:String, val Name:String, val Surname:String, val organizational_unit:String, val UME:String, val person_type:String, val SOD_HIGH:String, val SOD_MEDIUM:String, val SOD_LOW:String, val Under_mitigated:String) extends VertexProperty(userId)
case class Account(val accountId:Long, var offset:Int, val userCode:String, val userId:String, val account_creation_date:String, var disabled:String, var forcechangepwd:String, var pwdlife:String, var numberloginerror:String, var lastchangepwd:String, var lastlogin:String, var lastwronglogin:String, var state:String, var expire:String, var last_cert_time:String, var creation_date:String, var creation_user:String,var challenge_counter:String, var challenge_failed_attempt:String) extends VertexProperty(accountId) //Check if userCode is actually the code in this example.
case class Application(var applicationId:Long, var offset:Int, var Name:String, var Description:String, var Target:String, var Owner:String, var Ownercode:String, var Creation_date:String, var Creation_user:String) extends VertexProperty(applicationId)
case class Entitlement(val entitlementId:Long, var offset:Int, val Name:String, var Code:String, var Description:String, var Type:String, var Application:String, var Administrative:String, var Parent_ID:String, var Owner_code:String, var Scope_type:String, var Business_name:String, var Business_policy:String, var SOD_high:String, var SOD_medium:String, var SOD_low:String) extends VertexProperty(entitlementId)

/*
Some code for computing vertexRDD and edges
*/    

val graph: Graph[VertexProperty,String] = Graph(vertexRDD, edges, new VertexProperty(-1))
val triplets = graph.triplets
val temp = triplets.map(t => t.attr)
val distinct_users = temp.distinct.filter(t => t != "NULL")

var bcast_graph = sc.broadcast(graph)
val users_graph = distinct_users.map(du => du -> bcast_graph.value.subgraph(epred = t => t.attr == du))

Solution

  • Long story short, you can't broadcast a Graph because it contains an RDD (actually a couple of them). And you cannot use a Graphin a map function on an RDD because it is composed of RDDs.

    Like I said, it's a long story why you cannot do either of those -- and they're really 2 sides of the same coin. It's the same issue you are up against either way.

    Spark is built on a master / slave concept. It's in the master's memory space where RDDs and the meta-actions associated with them are defined. But the code -- the ... inside map(...) -- gets executed on the slaves (called executors). Your map code cannot run inside the executors when you refer to a different RDD in any way -- and broadcast is never going to help, because the RDD references can only exist in the master.

    What can you do instead? You have two options:

    1. Collect the data you need using collect(), and either broadcast that data or just refer to it inside of your map code. collect() pulls all the data into the master, but most importantly for your issue it means that you can now refer to the data without using an RDD reference, so you can ship the collected data out to your executors -- either by using broadcast or just by referring to them in your map(...) code (Spark will ship copies of the data to your executors). Which of these works -- or whether either will -- depends on your data size, speed expectations, etc.
    2. Use RDD.join() or RDD.cogroup() to work on the two Graphs at the same time.

    Both of these are complicated by the fact that you are working on a higher-order structure -- a GraphX Graph. You are going to have to work on the separate Graph.vertices and Graph.edges RDDs separately, do your collect() or join() and then re-build the final Graph by stitching together the appropriate RDDs.