Search code examples
apache-sparkspark-graphx

When joining vertices, am I forced to use MEMORY_ONLY caching?


Looking at the source of outerJoinVertices

I wonder if this is a bug or a feature

override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
      (other: RDD[(VertexId, U)])
      (updateF: (VertexId, VD, Option[U]) => VD2)
      (implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
    // The implicit parameter eq will be populated by the compiler if VD and VD2 are equal, and left
    // null if not
    if (eq != null) {
      vertices.cache() // <===== what if I wanted it serialized? 
      // updateF preserves type, so we can use incremental replication
      val newVerts = vertices.leftJoin(other)(updateF).cache()
      val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
      val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
        .updateVertices(changedVerts)
      new GraphImpl(newVerts, newReplicatedVertexView)
    } else {
      // updateF does not preserve type, so we must re-replicate all vertices
      val newVerts = vertices.leftJoin(other)(updateF)
      GraphImpl(newVerts, replicatedVertexView.edges)
    }
  }

Questions

  1. If my graph / joined vertices are already cached via another StorageLevel (e.g. MEMORY_ONLY_SER) - is this what causes the org.apache.spark.graphx.impl.ShippableVertexPartitionOps ... WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.?

  2. If this is the case, then is this a bug in Spark (this is from 1.3.1)? couldn't find the JIRA issue on this if it is (but I didn't look too hard...)

  3. Why is it not as trivial to fix as to provide this method a new StorageLevel?

  4. What are the workarounds for this? (one that I can think of, is to do create a new Graph with vertices.join(otherVertices) and originalGraph.edges or something... but it feels wrong...


Solution

  • Well, I think it's actually not a bug.

    Looking at the code for VertexRDD it overrides the cache method, and uses the original StorageLevel used to create this vertex.

      override def cache(): this.type = {
        partitionsRDD.persist(targetStorageLevel)
        this
      }