Search code examples
apache-sparkspark-graphx

Generate `VertexId` from pairs of `String`


I'm using GraphX to process some graph data on Spark. The input data is given as RDD[(String, String)]. I used the following snippet to map String to VertexId and build the graph.

val input: RDD[(String, String)] = ...

val vertexIds = input.map(_._1)
                     .union(input.map(_._2))
                     .distinct()
                     .zipWithUniqueId()
                     .cache()

val edges = input.join(vertexIds)
                 .map { case (u, (v, uid)) => (v, uid) }
                 .join(vertexIds)
                 .map { case (v, (uid, vid)) => Edge(uid, vid, 1) }

val graph = Graph(vertexIds.map { case (v, vid) => (vid, v) }, edges )

When I did a spot check to see the top 1000 highest degree nodes, I found the result of GraphX is different from the original input. Here's how I dump the high degree node

graph.outerJoinVertices(graph.outDegrees) {
  (_, vdata, deg) => (deg.getOrElse(0L), vdata)
}.vertices.map(_._2).top(1000).saveTo(....)

I suspect .zipWithUniqueId gives unstable ids for each evaluation. I tried

  • insert vertexIds.count() to force materialization so that vertexIds doesn't get reevaluated.
  • insert .sortBy(...).zipWithUniqueId() to make sure the ordering is the same.

Neither of them solves the problem. The results of top 1000 degree nodes differ slightly for each run.


Solution

  • I found two solutions to stabilize the String -> VertexId mapping:

    • persist vertexIds to FS.

      input.map(_._1)
           .union(input.map(_._2))
           .distinct()
           .zipWithUniqueId()
           .saveAsObjectFile("some location")
      val vertexId = sc.objectFile("some location")
      
    • use a collision-resistant hash function. I used Guava's murmur3_128 hash and took the first 8 bytes as vertexId. With this approach, you don't need to do any further joins, which is more efficient.