From spark RDD - countByValue is returning Map Datatype and want to sort by key ascending/ descending .
val s = flightsObjectRDD.map(_.dep_delay / 60 toInt).countByValue() // RDD type is action and returning Map datatype
s.toSeq.sortBy(_._1)
The above code is working as expected. But countByValue itself have implicit sorting . How can i implement that way?
You exit the Big Data realm and get into Scala itself. And then into all those structures that are immutable, sorted, hashed and mutable, or a combination of these. I think that is the reason for the -1 initially. Nice folks out there, anyway.
Take this example, the countByValue returns a Map to the Driver, so only of interest for small amounts of data. Map is also (key, value) pair but with hashing and immutable. So we need to manipulate it. This is what you can do. First up you can sort the Map on the key in ascending order.
val rdd1 = sc.parallelize(Seq(("HR",5),("RD",4),("ADMIN",5),("SALES",4),("SER",6),("MAN",8),("MAN",8),("HR",5),("HR",6),("HR",5)))
val map = rdd1.countByValue
val res1 = ListMap(map.toSeq.sortBy(_._1):_*) // ascending sort on key part of Map
res1: scala.collection.immutable.ListMap[(String, Int),Long] = Map((ADMIN,5) -> 1, (HR,5) -> 3, (HR,6) -> 1, (MAN,8) -> 2, (RD,4) -> 1, (SALES,4) -> 1, (SER,6) -> 1)
However, you cannot apply reverse or descending logic on the key as it is hashing. Next best thing is as follows:
val res2 = map.toList.sortBy(_._1).reverse
val res22 = map.toSeq.sortBy(_._1).reverse
res2: List[((String, Int), Long)] = List(((SER,6),1), ((SALES,4),1), ((RD,4),1), ((MAN,8),2), ((HR,6),1), ((HR,5),3), ((ADMIN,5),1))
res22: Seq[((String, Int), Long)] = ArrayBuffer(((SER,6),1), ((SALES,4),1), ((RD,4),1), ((MAN,8),2), ((HR,6),1), ((HR,5),3), ((ADMIN,5),1))
But you cannot apply the .toMap
against the .reverse
here, as it will hash and lose the sort. So, you must make a compromise.