Search code examples
algorithmscalaapache-sparkspark-graphx

how to sum edge weights with graphx


I have a Graph[Int, Int], where each edge has a weight value. What I want to do is, for each user, to collect all in-edges and sum the weight associated to each of them.

Say data is like:

    import org.apache.spark.graphx._
    val sc: SparkContext
        // Create an RDD for the vertices
        val users: RDD[(VertexId, (String, String))] = 
             sc.parallelize(Array((3L, ("rxin", "student")), 
                                  (7L,("jgonzal", "postdoc")),
                                  (5L, ("franklin", "prof")), 
                                  (2L, ("istoica", "prof"))))
    // Create an RDD for edges
    val relationships: RDD[Edge[Int]] =
         sc.parallelize(Array(Edge(3L, 7L, 12),
                              Edge(5L, 3L, 1),
                              Edge(2L, 5L, 3), 
                              Edge(5L, 7L, 5)))

    // Define a default user in case there are relationship with missing user
    val defaultUser = ("John Doe", "Missing")

    // Build the initial Graph
    val graph = Graph(users, relationships, defaultUser)

My ideal outcome is a data frame with vertices ids and the summed weight value... it is basically a weighted in-degree measure...

id    value
3L    1
5L    3
7L    17
2L    0

Solution

  • val temp = graph.aggregateMessages[int](triplet => {triplet.sendToDst(triplet.attr)},_ + _, TripletFields.EdgeOnly).toDF("id","value")
    
    temp.show()