I have a data-set which contains the (child, parent) entities. I need to find the ultimate parent of every child from the data-set. My data-set has 1.3 million records. Sample data is given below.
c-1, p-1
p-1, p-2
p-2, p-3
p-3, p-4
In the above sample data the ultimate parent of c-1 is p-4, ultimate parent of p-1 is p-4 and so on. Some times to find the ultimate parent of a child i need to traverse multiple levels recursively. This is what i have tried so far.
Any suggestions on to how to approach this problem?
To address both the problems cited by you, implementing CTE’s in spark is using Graphx Pregel API could come to your rescue.
Here is a sample code below.
//setup & call the pregel api
def calcTopLevelHierarcy(vertexDF: DataFrame, edgeDF: DataFrame): RDD[(Any,(Int,Any,String,Int,Int))] = {
// create the vertex RDD
// primary key, root, path
val verticesRDD = vertexDF
.rdd
.map{x=> (x.get(0),x.get(1) , x.get(2))}
.map{ x => (MurmurHash3.stringHash(x._1.toString).toLong, ( x._1.asInstanceOf[Any], x._2.asInstanceOf[Any] , x._3.asInstanceOf[String]) ) }
// create the edge RDD
// top down relationship
val EdgesRDD = edgeDF.rdd.map{x=> (x.get(0),x.get(1))}
.map{ x => Edge(MurmurHash3.stringHash(x._1.toString).toLong,MurmurHash3.stringHash(x._2.toString).toLong,"topdown" )}
// create graph
val graph = Graph(verticesRDD, EdgesRDD).cache()
val pathSeperator = """/"""
// initialize id,level,root,path,iscyclic, isleaf
val initialMsg = (0L,0,0.asInstanceOf[Any],List("dummy"),0,1)
// add more dummy attributes to the vertices - id, level, root, path, isCyclic, existing value of current vertex to build path, isleaf, pk
val initialGraph = graph.mapVertices((id, v) => (id,0,v._2,List(v._3),0,v._3,1,v._1) )
val hrchyRDD = initialGraph.pregel(initialMsg,
Int.MaxValue,
EdgeDirection.Out)(
setMsg,
sendMsg,
mergeMsg)
// build the path from the list
val hrchyOutRDD = hrchyRDD.vertices.map{case(id,v) => (v._8,(v._2,v._3,pathSeperator + v._4.reverse.mkString(pathSeperator),v._5, v._7 )) }
hrchyOutRDD
}
In the method, calcTopLevelHierarcy(), you can pass-in DataFrame (which addresses your second point).
Here is a very good link with some sample code. Please take a look.
Hope, this helps.