Search code examples
rapache-sparkcheckpointing

checkpointing DataFrames in SparkR


I am looping over a number of csv data files using R/spark. About 1% of each file must be retained (filtered based on certain criteria) and merged with the next data file (I have used union/rbind). However, as the loop runs, the lineage of the data gets longer and longer as spark remembers all the previous datasets and filter()-s.

Is there a way to do checkpointing in spark R API? I have learned that spark 2.1 has checkpointing for DataFrames but this seems not to be made available from R.


Solution

  • We got the same issue with Scala/GraphX on a quite large graph (few billions of data) and the search for connected components .

    I'm not sure what is available in R for your specific version, but a usual workaround is to break the lineage by "saving" the data then reloading it. In our case, we break the lineage every 15 iterations:

    def refreshGraph[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED], checkpointDir: String, iterationCount: Int, numPartitions: Int): Graph[VD, ED] = {
        val path = checkpointDir + "/iter-" + iterationCount
        saveGraph(g, path)
        g.unpersist()
        loadGraph(path, numPartitions)
    }