Search code examples
scalaapache-sparkspark-graphx

how to compute vertex similarity to neighbors in graphx


Suppose to have a simple graph like:

val users = sc.parallelize(Array(
                 (1L, Seq("M", 2014, 40376, null, "N", 1, "Rajastan")),
                 (2L, Seq("M", 2009, 20231, null, "N", 1, "Rajastan")),
                 (3L, Seq("F", 2016, 40376, null, "N", 1, "Rajastan"))
            ))                                
val edges = sc.parallelize(Array(
                 Edge(1L, 2L, ""), 
                 Edge(1L, 3L, ""), 
                 Edge(2L, 3L, "")))
val graph = Graph(users, edges)

I'd like to compute how much each vertex is similar to its neighbors on each attribute.

The ideal output (an RDD or DataFrame) would hold these results:

1L: 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0
2L: 0.5, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0
3L: 0.0, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0

For instance, the first value for 1L means that on 2 neighbors, just 1 share the same value...

I am playing with aggregateMessage just to count how many neighbors have a similar attribute value but with no avail so far:

val result = graph.aggregateMessages[(Int, Seq[Any])](
    // build the message
    sendMsg = {
        // map function
        triplet =>
        // send message to destination vertex
        triplet.sendToDst(1, triplet.srcAttr)
        // send message to source vertex 
        triplet.sendToSrc(1, triplet.dstAttr)
    }, // trying to count neighbors with similar property
    { case ((cnt1, sender), (cnt2, receiver)) =>
        val prop1 = if(sender(0) == receiver(0)) 1d else 0d
        val prop2 = if(Math.abs(sender(1).asInstanceOf[Int] - receiver(1).asInstanceOf[Int])<3) 1d else 0d
        val prop3 = if(sender(2) == receiver(2)) 1d else 0d
        val prop4 = if(sender(3) == receiver(3)) 1d else 0d
        val prop5 = if(sender(4) == receiver(4)) 1d else 0d
        val prop6 = if(sender(5) == receiver(5)) 1d else 0d
        val prop7 = if(sender(6) == receiver(6)) 1d else 0d
        (cnt1 + cnt2, Seq(prop1, prop2, prop3, prop4, prop5, prop6, prop7))
    }
)

this gives me the correct neighborhood size for each vertex but is not summing up the values right:

//> (1,(2,List(0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0)))
//| (2,(2,List(0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)))
//| (3,(2,List(1.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0)))

Solution

  • It doesn't sum values because there is no sum in your code. Moreover your logic is wrong. mergeMsg receives messages not (message, current) pairs. Try something like this:

    import breeze.linalg.DenseVector
    
    def compareAttrs(xs: Seq[Any], ys: Seq[Any]) = 
      DenseVector(xs.zip(ys).map{ case (x, y) => if (x == y) 1L else 0L}.toArray)
    
    val result = graph.aggregateMessages[(Long, DenseVector[Long])](
      triplet => {
        val comparedAttrs = compareAttrs(triplet.dstAttr, triplet.srcAttr)
        triplet.sendToDst(1L, comparedAttrs)
        triplet.sendToSrc(1L, comparedAttrs)
      },
      { case ((cnt1, v1), (cnt2, v2)) => (cnt1 + cnt2, v1 + v2) }
    )
    
    result.mapValues(kv => (kv._2.map(_.toDouble) / kv._1.toDouble)).collect
    // Array(
    //   (1,DenseVector(0.5, 0.0, 0.5, 1.0, 1.0, 1.0, 1.0)),
    //   (2,DenseVector(0.5, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0)), 
    //   (3,DenseVector(0.0, 0.0, 0.5, 1.0, 1.0, 1.0, 1.0)))