I have a pair RDD and want to construct a GraphX Graph using it. I want to have weighted edges i.e. if one edge appears 3 times in the pair RDD I want the edge weight to be 3
.
take(1)
from the RDD looks like this:
res2: Array[(String, String)] = Array((905067378709,905458844980))
Solution for directed Graph
Let's suppose you have the follow pair RDD containg the edges:
val data: RDD[(String, String)] = sc.parallelize(
Seq(
("905067378709", "905458844980"),
("905067378709", "905458844980"),
("905458844980", "905067378709"),
("905067378709", "905458844980"),
("905458844982", "905458844984"),
("905067378709", "905458844984"),
("905067378712", "905067378709")))
create the follow RDD[(VertexId, VertexId)]:
val edgesRDD: RDD[(VertexId, VertexId)] = data.map { case (a, b) => (a.toLong, b.toLong) }
and then create the Graph with the function Graph.fromEdgeTuples. This function allows creating a graph from only an RDD of edge tuples, assigning the edges the value 1, and automatically creating any vertices mentioned by edges and assigning them the default value.
val graph = Graph.fromEdgeTuples(edgesRDD, 1)
// to print
val vert: VertexRDD[Int] = graph.vertices
vert.foreach { println }
val edg: EdgeRDD[Int] = graph.edges
edg.foreach { println }
Now we can calculate the weight of duplicates edges:
val subgraph = graph.partitionBy(PartitionStrategy.CanonicalRandomVertexCut)
.groupEdges((a, b) => a + b)
// To print
val vert2: VertexRDD[Int] = subgraph.vertices
vert2.foreach { println }
val edg2: EdgeRDD[Int] = subgraph.edges
edg2.foreach { println }
The result is:
Edge(905067378712,905067378709,1)
Edge(905067378709,905458844984,1)
Edge(905067378709,905458844980,3) the edge occurred 3 times
Edge(905458844980,905067378709,1)
Edge(905458844982,905458844984,1)