I'm using GraphX to process some graph data on Spark. The input data is given as RDD[(String, String)]
. I used the following snippet to map String
to VertexId
and build the graph.
val input: RDD[(String, String)] = ...
val vertexIds = input.map(_._1)
.union(input.map(_._2))
.distinct()
.zipWithUniqueId()
.cache()
val edges = input.join(vertexIds)
.map { case (u, (v, uid)) => (v, uid) }
.join(vertexIds)
.map { case (v, (uid, vid)) => Edge(uid, vid, 1) }
val graph = Graph(vertexIds.map { case (v, vid) => (vid, v) }, edges )
When I did a spot check to see the top 1000 highest degree nodes, I found the result of GraphX is different from the original input. Here's how I dump the high degree node
graph.outerJoinVertices(graph.outDegrees) {
(_, vdata, deg) => (deg.getOrElse(0L), vdata)
}.vertices.map(_._2).top(1000).saveTo(....)
I suspect .zipWithUniqueId
gives unstable ids for each evaluation. I tried
vertexIds.count()
to force materialization so that vertexIds
doesn't get reevaluated..sortBy(...).zipWithUniqueId()
to make sure the ordering is the same.Neither of them solves the problem. The results of top 1000 degree nodes differ slightly for each run.
I found two solutions to stabilize the String -> VertexId
mapping:
persist vertexIds
to FS.
input.map(_._1)
.union(input.map(_._2))
.distinct()
.zipWithUniqueId()
.saveAsObjectFile("some location")
val vertexId = sc.objectFile("some location")
use a collision-resistant hash function. I used Guava's murmur3_128 hash and took the first 8 bytes as vertexId. With this approach, you don't need to do any further joins, which is more efficient.