Looking at the source of outerJoinVertices
I wonder if this is a bug or a feature
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
(updateF: (VertexId, VD, Option[U]) => VD2)
(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
// The implicit parameter eq will be populated by the compiler if VD and VD2 are equal, and left
// null if not
if (eq != null) {
vertices.cache() // <===== what if I wanted it serialized?
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(other)(updateF).cache()
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
.updateVertices(changedVerts)
new GraphImpl(newVerts, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(other)(updateF)
GraphImpl(newVerts, replicatedVertexView.edges)
}
}
Questions
If my graph / joined vertices are already cached via another StorageLevel
(e.g. MEMORY_ONLY_SER
) - is this what causes the org.apache.spark.graphx.impl.ShippableVertexPartitionOps ... WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.
?
If this is the case, then is this a bug in Spark (this is from 1.3.1)? couldn't find the JIRA issue on this if it is (but I didn't look too hard...)
Why is it not as trivial to fix as to provide this method a new StorageLevel?
What are the workarounds for this? (one that I can think of, is to do create a new Graph with vertices.join(otherVertices) and originalGraph.edges or something... but it feels wrong...
Well, I think it's actually not a bug.
Looking at the code for VertexRDD
it overrides the cache method, and uses the original StorageLevel
used to create this vertex.
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}