Search code examples
scalaapache-sparkspark-streamingrddbroadcasting

How to access lookup(broadcast) RDD(or dataset) into other RDD map function


I am new to spark and scala and just started learning ... I am using spark 1.0.0 on CDH 5.1.3

I got a broadcasted rdd named dbTableKeyValueMap: RDD[(String, String)], I want to use dbTableKeyValueMap to deal with my fileRDD( each row has 300+ columns). This is the code:

val get = fileRDD.map({x =>
  val tmp = dbTableKeyValueMap.lookup(x)
  tmp
})

Running this locally hangs and/or after sometime gives error :

scala.MatchError: null
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)

I can understand accessing one RDD inside other will have issues, if locality and size of collection come into picture.. For me taking Cartesian product is not option as records in file RDD are huge(each row with 300+ columns) ... Just like I used distributed cache to load this dbTableKeyValueMap in setup method and use in MAP of hadoop java mapreduce code, I want to use similar way in spark map... I am not able to find simple example to refer similar usecase... One by one I want to iterate over fileRDD rows and do some transformation, beatifications, lookups etc. on "each column" for further processing... Or there is any other way where I can use dbTableKeyValueMap as scala collection instead of spark RDD

Please help


Solution

  • Thanks.... easiest thing to do was to convert lookup RDD into "scala collection" and good to go!! I am able to access it inside transformations with any RDD....

    val scalaMap = dbTableKeyValueMap.collectAsMap.toMap
    val broadCastLookupMap = sc.broadcast(scalaMap)
    
    val get = fileRDD.map({x =>
      val tmp = broadCastLookupMap.value.get(x).head
      tmp
    })
    

    This easy solution should be documented somewhere for early learners ..It took while for me to figure it out...

    Thanks for the help...