Search code examples
apache-sparkspark-graphx

Cartesian product between vertices of a GraphX


I will like to do a cartesian product between the nodes of a Graph. I want to build their distance matrix. Maybe this is not a very good approach, so, any suggestion is welcome.

This is my code, and it's not working, I don't have any warning nor exception, it just does not work. I think maybe is because I'm trying to make a cartesian product with the same RDD, but I don't know how to fix it, how to make a nested loop or something that can help me to compute this matrix.

val indexes1 = graph.vertices.map(_._1)
val indexes2 = graph.vertices.map(_._1)

val cartesian = indexes1.cartesian(indexes2).cache()
cartesian.map(pair => matrix.updated(pair._1, shortPathBetween(pair._1, pair._2)))

def shortPathBetween(v1:VertexId, v2:VertexId) : Int = {
    val path = ShortestPaths.run(graph, Seq(v2))
    val shortestPath = path.vertices.filter({case (vId, _ ) => vId == v1})
        .first()
        ._2
        .get(v2)

    shortestPath.getOrElse(-1)
}

Solution

  • The way I would approach this, is using the pregel API. This allows for parallel traversing the graph from each node. If you keep track of the distances and update them while traversing with the edge weight you end up with vertices with distances to each (reachable) other vertex.

    If you for example take this directed graph:

    directed graph

    You can init this in Spark GraphX like this:

    val graphData = List(
        (0, 0, 1, 10.0),
        (1, 0, 2, 5.0),
        (2, 1, 2, 2.0),
        (3, 1, 3, 1.0),
        (4, 2, 1, 3.0),
        (5, 2, 3, 9.0),
        (6, 2, 4, 2.0),
        (7, 3, 4, 4.0),
        (8, 4, 0, 7.0),
        (9, 4, 3, 5.0)
      ).toDF("id", "from", "to", "distance")
    
      val vertexRDD: RDD[(Long, Int)] = graphData.flatMap(_.getValuesMap[Int](List("to", "from")).values).distinct().map(i => (i.toLong, i)).rdd
      val edgeRDD: RDD[Edge[Double]] = graphData.map(x => Edge(x.getInt(1), x.getInt(2), x.getDouble(3))).rdd
      val graph: Graph[Int, Double] = Graph(vertexRDD, edgeRDD)
    

    The pregel call takes 3 functions

    • vprog to initialize each vertex with a message (in this case empty Map[VertexId, Double] to keep track of distances)
    • sendMsg an update step that is applied on each iteration (in this case updating the distances by adding the weight of the edge and returning an Iterator with messages to send out to the next iteration
    • mergeMsg to merge two messages (2 Map[VertexId, Double]s into 1, keeping shortest distance)

    In code this could look like:

    def vprog(id: VertexId, orig: Map[VertexId, Double], newly: Map[VertexId, Double]): Map[VertexId, Double] = newly
    
    def mergeMsg(a: Map[VertexId, Double], b: Map[VertexId, Double]): Map[VertexId, Double] = (a.toList ++ b.toList).groupBy(_._1).map{ // mapValues is not serializable :-(
        case (id, v) => id -> v.map(_._2).min // keep shortest distance in case of duplicate
    }
    
    def sendMsg(trip: EdgeTriplet[Map[VertexId, Double], Double]): Iterator[(VertexId, Map[VertexId, Double])] = {
        val w = trip.attr // weight of edge from src -> dst
        val distances = trip.dstAttr.mapValues(_ + w) + // update collected distances at dst + edge weight
          (trip.srcId -> 0.0, trip.dstId -> w) // set distance to src to 0  and to dst the edge weight
    
        // If src contains as much nodes as dst (we traversed all)
        if(trip.srcAttr.keySet.intersect(distances.keySet).size != distances.keySet.size)
          Iterator((trip.srcId, distances))
        else
          Iterator.empty
    }
    

    Then run the pregel, collect the vertices and pivot the map to get a distance matrix.

    val initMap = Map.empty[VertexId, Double]
    
    val result = graph
        .mapVertices((_,_) => initMap)
        .pregel(
          initialMsg = initMap,
          activeDirection = EdgeDirection.Out
        )(vprog, sendMsg, mergeMsg)
        .vertices
        .toDF("id","map")
        .select('id, explode('map))
        .groupBy("id")
        .pivot("key")
        .agg(min("value"))
        .orderBy("id")
        .show(false)
    

    The result will look like

    +---+----+----+----+----+---+
    |id |0   |1   |2   |3   |4  |
    +---+----+----+----+----+---+
    |0  |0.0 |8.0 |5.0 |11.0|7.0|
    |1  |11.0|0.0 |2.0 |1.0 |4.0|
    |2  |9.0 |3.0 |0.0 |4.0 |2.0|
    |3  |11.0|21.0|16.0|0.0 |4.0|
    |4  |7.0 |15.0|12.0|5.0 |0.0|
    +---+----+----+----+----+---+
    

    Maybe there are other/better ways, but this seems computationally less intense than calculating shortest path between nodes as a cartesian product ;-)