Search code examples
scalaapache-sparkspark-graphx

how to retrieve the value of a property using the value of another property in RDDs


I have a links:JdbcRDD[String] which contains links in the form:

{"bob,michael"} 

respectively for the source and destination of each link. I can split each string to retrieve the string that uniquely identifies the source node and the destination node. I then have a users:RDD[(Long, Vertex)] that holds all the vertices in my graph. Each vertex has a nameId:String property and a nodeId:Long property.

I'd like to retrieve the nodeId from the stringId, but don't know how to implement this logic, being rather new both at Scala and Spark. I am stuck with this code:

val reflinks = links.map { x =>
    // split each line in an array
    val row = x.split(',')
    // retrieve the id using the row(0) and row(1) values
    val source = users.filter(_._2.stringId == row(0)).collect()
    val dest = users.filter(_._2.stringId == row(1)).collect()
    // return last value
    Edge(source(0)._1, dest(0)._1, "referral")
    // return the link in Graphx format
    Edge(ids(0), ids(1), "ref")
}

with this solution I get:

 org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

Solution

  • Unfortunately, you cannot have nested RDDs in Spark. That is, you cannot access one RDD while you are inside the closure send to another RDD.

    If you want to combine knowledge from more than one RDD you need to join them in some way. Here is one way to solve this problem:

    import org.apache.spark.graphx._
    import org.apache.spark.SparkContext._
    
    // These are some toy examples of the original data for the edges and the vertices
    val rawEdges = sc.parallelize(Array("m,a", "c,a", "g,c"))
    val rawNodes = sc.parallelize(Array( ("m", 1L), ("a", 2L), ("c", 3L), ("g", 4L)))
    
    val parsedEdges: RDD[(String, String)] = rawEdges.map(x => x.split(",")).map{ case Array(x,y) => (x,y) }
    
    // The two joins here are required since we need to get the ID for both nodes of each edge
    // If you want to stay in the RDD domain, you need to do this double join.
    val resolvedFirstRdd = parsedEdges.join(rawNodes).map{case (firstTxt,(secondTxt,firstId)) => (secondTxt,firstId) }
    val edgeRdd = resolvedFirstRdd.join(rawNodes).map{case (firstTxt,(firstId,secondId)) => Edge(firstId,secondId, "ref") }
    
    // The prints() are here for testing (they can be expensive to keep in the actual code)
    edgeRdd.foreach(println)
    val g = Graph(rawNodes.map(x => (x._2, x._1)), edgeRdd)
    
    println("In degrees")
    g.inDegrees.foreach(println) 
    println("Out degrees")
    g.outDegrees.foreach(println) 
    

    The print output for testing:

    Edge(3,2,ref)
    Edge(1,2,ref)
    Edge(4,3,ref)
    In degrees
    (3,1)
    (2,2)
    Out degrees
    (3,1)
    (1,1)
    (4,1)