Search code examples
scalarddorc

How to filter RDD relying on hash map?


I'm new to using spark and scala but I have to solve the following problem: I have one ORC file containing rows which I have to check against a certain condition comming from a hash map.

I build the hash map (filename,timestamp) with 120,000 entries this way (getTimestamp returns an Option[Long] type):

val tgzFilesRDD = sc.textFile("...")
val fileNameTimestampRDD = tgzFilesRDD.map(itr => {
    (itr, getTimestamp(itr))
})
val fileNameTimestamp = fileNameTimestampRDD.collect.toMap

And retrieve the RDD with 6 million entries like this:

val sessionDataDF = sqlContext.read.orc("...")
case class SessionEvent(archiveName: String, eventTimestamp: Long)
val sessionEventsRDD = sessionDataDF.as[SessionEvent].rdd

And do the check:

val sessionEventsToReport = sessionEventsRDD.filter(se => {
    val timestampFromFile = fileNameTimestamp.getOrElse(se.archiveName, None)
    se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})

Is this the right and performant way to do it? Is there a caching recommended? Will the Map fileNameTimestamp get shuffled to the clusters where the parititons where processed?


Solution

  • fileNameTimestamp will get serialized for each task, and with 120,000 entries, it may be quite expensive. You should broadcast large objects and reference the broadcast variables:

    val fileNameTimestampBC = sc.broadcast(fileNameTimestampRDD.collect.toMap)
    

    Now only one of these object will be shipped to each worker. There is also no need to drop down to the RDD API, as the Dataset API has a filter method:

    val sessionEvents = sessionDataDF.as[SessionEvent]
    val sessionEventsToReport = sessionEvents.filter(se => {
        val timestampFromFile = fileNameTimestampBC.value.getOrElse(se.archiveName, None)
        se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
    })