Search code examples
garbage-collectionapache-sparkaccumulator

How to ensure garbage collection of unused accumulators?


I meet a problem that Accumulator on Spark can not be GC.

def newIteration (lastParams: Accumulable[Params, (Int, Int, Int)], lastChosens: RDD[Document], i: Int): Params = {
    if (i == maxIteration)
        return lastParams.value

    val size1: Int = 100
    val size2: Int = 1000

    // each iteration generates a new accumulator
    val params = sc.accumulable(Params(size1, size2))

    // there is map operation here
    // if i only use lastParams, the result in not updated
    // but params can solve this problem 
    val chosen = data.map {
        case(Document(docID, content)) => {
            lastParams += (docID, content, -1)
            val newContent = lastParams.localValue.update(docID, content)
            lastParams += (docID, newContent, 1)
            params += (docID, newContent, 1)
            Document(docID, newContent)
        }
    }.cache()
    chosen.count()
    lastChosens.unpersist()
    return newIteration(params, chosen, i + 1)
}

The problem is that the memory it allocates is always growing, until memory limits. It seems that lastParms is not GC. Class RDD and Broadcast have a method unpersist(), but I cannot find any method like this in documentation.

Why Accumulable cannot be GC automatically, or is there a better solution?


Solution

  • UPDATE (April 22nd, 2016): SPARK-3885 Provide mechanism to remove accumulators once they are no longer used is now resolved.

    There's ongoing work to add support for automatically garbage-collecting accumulators once they are no longer referenced. See SPARK-3885 for tracking progress on this feature. Spark PR #4021, currently under review, is a patch for this feature. I expect this to be included in Spark 1.3.0.