Search code examples
apache-sparkmapreducerdd

How to avoid large intermediate result before reduce?


I'm getting an error in a spark job that's surprising me:

 Total size of serialized results of 102 tasks (1029.6 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)

My job is like this:

def add(a,b): return a+b
sums = rdd.mapPartitions(func).reduce(add)

rdd has ~500 partitions and func takes the rows in that partition and returns a large array (a numpy array of 1.3M doubles, or ~10Mb). I'd like to sum all these results and return their sum.

Spark seems to be holding the total result of mapPartitions(func) in memory (about 5gb) instead of processing it incrementally, which would require about only 30Mb.

Instead of increasing spark.driver.maxResultSize, is there a way perform the reduce more incrementally?


Update: Actually I'm kinda surprised that more that two results are ever held in memory.


Solution

  • When using reduce Spark applies final reduction on the driver. If func returns a single object this is effectively equivalent to:

    reduce(add, rdd.collect())
    

    You may use treeReduce:

    import math
    
    # Keep maximum possible depth
    rdd.treeReduce(add, depth=math.log2(rdd.getNumPartitions()))
    

    or toLocalIterator:

    sum(rdd.toLocalIterator())
    

    The former one will recursively merge partitions on the workers at the cost of increased network exchange. You can use depth parameter tune the performance.

    The latter one will collect only a single partition at the time, but it might require re-evaluation of the rdd and significant part of the job will be performed by the driver.

    Depending on the exact logic used in func you can also improve work distribution by splitting the matrix into blocks, and performing addition by-block, for example using BlockMatrices