I'm new to using spark and scala but I have to solve the following problem: I have one ORC file containing rows which I have to check against a certain condition comming from a hash map.
I build the hash map (filename,timestamp) with 120,000 entries this way (getTimestamp returns an Option[Long]
type):
val tgzFilesRDD = sc.textFile("...")
val fileNameTimestampRDD = tgzFilesRDD.map(itr => {
(itr, getTimestamp(itr))
})
val fileNameTimestamp = fileNameTimestampRDD.collect.toMap
And retrieve the RDD with 6 million entries like this:
val sessionDataDF = sqlContext.read.orc("...")
case class SessionEvent(archiveName: String, eventTimestamp: Long)
val sessionEventsRDD = sessionDataDF.as[SessionEvent].rdd
And do the check:
val sessionEventsToReport = sessionEventsRDD.filter(se => {
val timestampFromFile = fileNameTimestamp.getOrElse(se.archiveName, None)
se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})
Is this the right and performant way to do it? Is there a caching recommended?
Will the Map fileNameTimestamp
get shuffled to the clusters where the parititons where processed?
fileNameTimestamp will get serialized for each task, and with 120,000 entries, it may be quite expensive. You should broadcast large objects and reference the broadcast variables:
val fileNameTimestampBC = sc.broadcast(fileNameTimestampRDD.collect.toMap)
Now only one of these object will be shipped to each worker. There is also no need to drop down to the RDD API, as the Dataset API has a filter method:
val sessionEvents = sessionDataDF.as[SessionEvent]
val sessionEventsToReport = sessionEvents.filter(se => {
val timestampFromFile = fileNameTimestampBC.value.getOrElse(se.archiveName, None)
se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})