Search code examples
scalaperformanceapache-sparkrdd

Spark RDD: multiple reducebykey or just once


I have code like following:

// make a rd according to an id
def makeRDD(id:Int, data:RDD[(VertexId, Double)]):RDD[(Long, Double)] = { ... }  
val data:RDD[(VertexId, Double)] = ... // loading from hdfs
val idList = (1 to 100)
val rst1 = idList.map(id => makeRDD(id, data)).reduce(_ union _).reduceByKey(_+_)
val rst2 = idList.map(id => makeRDD(id, data)).reduce((l,r) => (l union r).reduceByKey(_+_))

rst1 and rst2 get the sample result. I thought rst1 require more memory (100 times) but only one reduceByKey tranform; however, rst2 require less memory but more reduceByKey tranforms (99 times). So, is it a game of time and space tradeoff?

My question is: whether my analysis above is right, or Spark treat translate the actions in the same way internally?

P.S.: rst1 union all sub rdd then reduceByKey,which reduceByKey is outside reduce. rst2 reduceByKey one by one, which reduceByKey is inside reduce.


Solution

  • Long story short both solutions are relatively inefficient but the second one is worst than the first.

    Let's start by answering the last question. For low level RDD API there are only two types of global automatic optimizations (instead):

    • using explicitly or implicitly cached tasks results instead recomputing complete lineage
    • combining multiple transformations which don't require a shuffle into a single ShuffleMapStage

    Everything else is pretty much a sequential transformations which defines DAG. This stays in contrast to more restrictive, high level Dataset (DataFrame) API, which makes specific assumptions about transformations and perform global optimizations of the execution plan.

    Regarding your code. The biggest problem with the first solution is a growing lineage when you apply iterative union. It makes some things, like failure recovery expensive, and since RDDs are defined recursively, can fail with StackOverflow exception. A less serious side effect is a growing number of partitions which is doesn't seem to be compensated in the subsequent reduction*. You'll find a more detailed explanation in my answer to Stackoverflow due to long RDD Lineage but what you really need here is a single union like this:

    sc.union(idList.map(id => makeRDD(id, data))).reduceByKey(_+_)
    

    This is actually an optimal solution assuming you apply truly reducing function.

    The second solution obviously suffers from the same problem, nevertheless it gets worse. While the first approach requires only two stages with a single shuffle, this requires a shuffle for each RDD. Since number of partitions is growing and you use default HashPartitioner each piece of data has to be written to disk multiple times and most likely shuffled over the network multiple times. Ignoring low level calculations each record is shuffled O(N) times where N is a number of RDDs you merge.

    Regarding memory usage it is not obvious without knowing more about data distribution but in the worst case scenario the second method can express significantly worse behavior.

    If + works with constant space the only requirement for reduction is a hashmap to store the results of map side combine. Since partitions are processed as a stream of data without reading complete content into memory, this means that total memory size for each task will be proportional to the number of unique keys and not the amount of data. Since the second method requires more tasks overall memory usage will be higher than the first case. On average it can be slightly better due to data being partially organized but it is rather unlikely to compensate additional costs.


    * If you want to learn how it can affect overall performance you can see Spark iteration time increasing exponentially when using join This is slightly different problem but should give you some idea why controlling number of partitions matters.