Search code examples
scalaapache-sparkspark-graphx

spark-graphx finding the most active user?


I have a graph of this form:

   _ 3 _
   /' '\
 (1)   (1)
 /       \
1--(2)--->2

I want to count the most active user (who follow the most,here it's user 1 who follows two times user 2 and one time user 3). My graph is of this form Graph[Int,Int]

val edges = Array(Edge(1,10,1), Edge(10,1,1), Edge(11,1,1), Edge(1,11,1), Edge(1,12,1))        
val vertices = Array((12L,12), (10L,10), (11L,11), (1L,1))
val graph = Graph(sc.parallelize(vertices),sc.parallelize(edges),0)

My idea is to use to group srcId for the edges and to count using the iterator and then to sort but I have issues to use the iterator, the type are quite complex:

graph.edges.groupBy(_.dstId).collect() has type:

Array[(org.apache.spark.graphx.VertexId,Iterable[org.apache.spark.graphx.Edge[Int]])]

Any ideas ?


Solution

  • Your idea of grouping by srcId is good, since you are looking for the relation follows and not is followed by (your example uses dstId by the way)

    val group = graph.edges.groupBy(_.srcId)
    

    group now contains the edges going out of each vertex. We can now take the sum of the attributes to get the total time the user follows any user.

    val followCount = group.map{
      case (vertex, edges) => (vertex, edges.map(_.attr).sum)
    }.collect
    

    Which produces

    Array((10,1), (11,1), (1,3))
    

    Now if you want to extract the user which follows the most, you can simply sort it by descending order and take the head of the list, which will give the most active user.

    val mostActiveUser = followCount.sortBy(- _._2).head