Search code examples
scalaapache-sparkrddaccumulator

How to split a sorted RDD into n parts and get first element from each part?


I'm trying to extract some elements from a sorted RDD[String]. I tried with "zipWithIndex" and filtering the RDD by remainder to be zero.

val expectedSize = 165
val n = rddOfStrings.count / expectedSize

val resultArray = rddOfStrings.sortBy(x => x).zipWithIndex.filter(x => x._2 % n == 0).map(_._1).collect

The problem here is "n" is not always integer. If it was a double, the size of resultArray will not be equal to expectedSize (produces +1 or -1). How to make this to return a collection with same size?

P.S. I tried spark accumulator by passing a collection object to all executors. Because of very large dataset it failed.


Solution

  • The 165 parts cannot be equal, really - some of them will have to be larger than others assuming the total size is not a multiplication of 165.

    To get these parts "as evenly distributed as possible", you can use a non-rounded n, create a Stream of n, 2n, 3n, ..., and then round each element in that Stream to get the indices of the elements you're after, then filter the RDD using contains:

    val expectedSize = 165
    val n: Double = rddOfStrings.count.toDouble / expectedSize
    
    val indices = Stream.iterate(0D)(x => x + n)
      .map(math.round)
      .take(expectedSize)
      .toList
    
    val resultArray = rddOfStrings.sortBy(x => x)
      .zipWithIndex
      .filter(x => indices.contains(x._2))
      .map(_._1)
      .collect