Search code examples
mapreduceapache-sparkspark-graphx

Spark - GraphX: mapReduceTriplets vs aggregateMessages


I am running by a tutorial http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html

And at some point we use the mapReduceTriplets operations. This returns the expected result

// Find the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](
  // For each edge send a message to the destination vertex with the attribute of the source vertex
  edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
  // To combine messages take the message for the older follower
  (a, b) => if (a._2 > b._2) a else b
)

But the IntelliJ points me that mapReduceTriplets is deprecated (as of 1.2.0) and should be replaced by aggregateMessages

// Find the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages()[(String, Int)](
  // For each edge send a message to the destination vertex with the attribute of the source vertex
  edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
  // To combine messages take the message for the older follower
  (a, b) => if (a._2 > b._2) a else b
)

So I run the exact same code but then I don't have any output. Is that the expected result or should I change something due to the cahnge of aggregateMessages?


Solution

  • Probably you need something like this:

    val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages[(String, Int)]
    (
        // For each edge send a message to the destination vertex with the attribute of the source vertex
        sendMsg = { triplet => triplet.sendToDst(triplet.srcAttr.name, triplet.srcAttr.age) },
       // To combine messages take the message for the older follower
        mergeMsg = {(a, b) => if (a._2 > b._2) a else b}
    )
    

    You can find aggregateMessages function signature and useful examples at Grapx proggraming guide page. Hope this helps.