Search code examples
apache-sparkspark-graphx

how to compute average degree of neighbors with GraphX


I want to compute the average degree of neighbors for each node in my graph. Say we have a graph like this:

 val users: RDD[(VertexId, String)] = 
         sc.parallelize(Array((3L, "rxin"), 
                              (7L, "jgonzal"),
                              (5L, "franklin"), 
                              (2L, "istoica")))
// Create an RDD for edges
val relationships: RDD[Edge[Int]] = sc.parallelize(
                    Array(Edge(3L, 7L, 12),
                          Edge(5L, 3L, 1),
                          Edge(2L, 5L, 3), 
                          Edge(5L, 7L, 5)))
// Build the initial Graph
val graph = Graph(users, relationships)

EDIT To have an idea of the outcome, take node 5 and its neighbors:

  • node 3 which has degree = 2
  • node 7 which has degree = 2
  • node 2 which has degree = 1

The output for this measure is simply the mean degree for the neighbors of node 5: (2+2+1)/3 = 1.666

Ideally, you want to remove links with node 5 in this computation, but that doesn't really matter to me now...

END EDIT

I am trying to apply aggregateMessages, but I don't know how to retrieve the degree of each node while I am into the aggregateMessages call:

val neideg = g.aggregateMessages[(Long, Double)](
    triplet => {
      val comparedAttrs = compareAttrs(triplet.dstAttr, triplet.srcAttr) // BUT HERE I SHOULD GIVE ALSO THE DEGREE
      triplet.sendToDst(1L, comparedAttrs)
      triplet.sendToSrc(1L, comparedAttrs)
    },
    { case ((cnt1, v1), (cnt2, v2)) => (cnt1 + cnt2, v1 + v2) })

val aveneideg = neideg.mapValues(kv => kv._2 / kv._1.toDouble).toDF("id", "aveneideg")

then I have a function that does the sum:

def compareAttrs(xs: (Int, String), ys: (Int, String)): Double = {
    xs._1.toDouble + ys._1.toDouble
}

how to pass to comparedAttrs also the value of degree for those nodes?

of course, more than happy to see a simpler and smarter solution for this task, compared to the one I am trying to craft...


Solution

  • I'm not clear if that's what you're after, but this is something you could go with:

    val degrees = graph.degrees
    // now we have a graph where attribute is a degree of a vertex
    val graphWithDegrees = graph.outerJoinVertices(degrees) { (_, _, optDegree) =>
        optDegree.getOrElse(1)    
    }
    
    // now each vertex sends its degree to its neighbours
    // we aggregate them in a set where each vertex gets all values
    // of its neighbours
    val neighboursDegreeAndCount = graphWithDegrees.aggregateMessages[List[Long]](
        sendMsg = triplet => {
            val srcDegree = triplet.srcAttr
            val dstDegree = triplet.dstAttr
            triplet.sendToDst(List(srcDegree))
            triplet.sendToSrc(List(dstDegree))
        },
        mergeMsg = (x, y) => x ++ y
    ).mapValues(degrees => degrees.sum / degrees.size.toDouble)
    
    // now if you want it in the original graph you can do
    // outerJoinVertices again, and now the attr of vertex 
    // in the graph is avg of its neighbours
    graph.outerJoinVertices(neighboursDegreeAndCount) { (_, _, optAvgDegree) =>
        optAvgDegree.getOrElse(1)
    }
    

    So for your example the output is: Array((5,1.6666666666666667), (2,3.0), (3,2.5), (7,2.5))