Search code examples
apache-sparkrdd

How long does RDD remain in memory?


Considering memory being limited, I had a feeling that spark automatically removes RDD from each node. I'd like to know is this time configurable? How does spark decide when to evict an RDD from memory

Note: I'm not talking about rdd.cache()


Solution

  • I'd like to know is this time configurable? How does spark decide when to evict an RDD from memory

    An RDD is an object just like any other. If you don't persist/cache it, it will act as any other object under a managed language would and be collected once there are no alive root objects pointing to it.

    The "how" part, as @Jacek points out is the responsibility of an object called ContextCleaner. Mainly, if you want the details, this is what the cleaning method looks like:

    private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
      while (!stopped) {
        try {
          val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
              .map(_.asInstanceOf[CleanupTaskWeakReference])
          // Synchronize here to avoid being interrupted on stop()
          synchronized {
            reference.foreach { ref =>
              logDebug("Got cleaning task " + ref.task)
              referenceBuffer.remove(ref)
              ref.task match {
                case CleanRDD(rddId) =>
                  doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
                case CleanShuffle(shuffleId) =>
                  doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
                case CleanBroadcast(broadcastId) =>
                  doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
                case CleanAccum(accId) =>
                  doCleanupAccum(accId, blocking = blockOnCleanupTasks)
                case CleanCheckpoint(rddId) =>
                  doCleanCheckpoint(rddId)
                }
              }
            }
          } catch {
            case ie: InterruptedException if stopped => // ignore
            case e: Exception => logError("Error in cleaning thread", e)
        }
      }
    }
    

    If you want to learn more, I suggest browsing Sparks source or even better, reading @Jacek book called "Mastering Apache Spark" (This points to an explanation regarding ContextCleaner)