Search code examples
pysparkspark-graphx

Parent child relationship model in pyspark using Graphx/Spark


I have a data-set which contains the (child, parent) entities. I need to find the ultimate parent of every child from the data-set. My data-set has 1.3 million records. Sample data is given below.

c-1, p-1
p-1, p-2
p-2, p-3
p-3, p-4

In the above sample data the ultimate parent of c-1 is p-4, ultimate parent of p-1 is p-4 and so on. Some times to find the ultimate parent of a child i need to traverse multiple levels recursively. This is what i have tried so far.

  1. I tried to create a spark DF and tried to recursively find the parent of every child. But this approach is taking very long time.
  2. I tried to create a UDF which can be applied on every row of the data-set. But i need to call the DF (lookup data-set) in the UDF. But spark does not support having DF in the UDF. So even this approach did not help me.

Any suggestions on to how to approach this problem?


Solution

  • To address both the problems cited by you, implementing CTE’s in spark is using Graphx Pregel API could come to your rescue.

    Here is a sample code below.

    //setup & call the pregel api
    def calcTopLevelHierarcy(vertexDF: DataFrame, edgeDF: DataFrame): RDD[(Any,(Int,Any,String,Int,Int))] = {
    
    // create the vertex RDD
    // primary key, root, path
    val verticesRDD = vertexDF
      .rdd
      .map{x=> (x.get(0),x.get(1) , x.get(2))}
      .map{ x => (MurmurHash3.stringHash(x._1.toString).toLong, ( x._1.asInstanceOf[Any], x._2.asInstanceOf[Any] , x._3.asInstanceOf[String]) ) }
    
    // create the edge RDD
    // top down relationship
    val EdgesRDD = edgeDF.rdd.map{x=> (x.get(0),x.get(1))}
      .map{ x => Edge(MurmurHash3.stringHash(x._1.toString).toLong,MurmurHash3.stringHash(x._2.toString).toLong,"topdown" )}
    
    // create graph
    val graph = Graph(verticesRDD, EdgesRDD).cache()
    
    val pathSeperator = """/"""
    
    // initialize id,level,root,path,iscyclic, isleaf
    val initialMsg = (0L,0,0.asInstanceOf[Any],List("dummy"),0,1)
    
    // add more dummy attributes to the vertices - id, level, root, path, isCyclic, existing value of current vertex to build path, isleaf, pk
    val initialGraph = graph.mapVertices((id, v) => (id,0,v._2,List(v._3),0,v._3,1,v._1) )
    
    val hrchyRDD = initialGraph.pregel(initialMsg,
      Int.MaxValue,
      EdgeDirection.Out)(
      setMsg,
      sendMsg,
      mergeMsg)
    
    
    // build the path from the list
    val hrchyOutRDD = hrchyRDD.vertices.map{case(id,v) => (v._8,(v._2,v._3,pathSeperator + v._4.reverse.mkString(pathSeperator),v._5, v._7 )) }
    
      hrchyOutRDD
    
    }
    

    In the method, calcTopLevelHierarcy(), you can pass-in DataFrame (which addresses your second point).

    Here is a very good link with some sample code. Please take a look.

    Hope, this helps.