I'm facing an issue with a Spark application. Here is a simplified version of my code :
def main(args: Array[String]) {
// Initializing spark context
val sc = new SparkContext()
val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)
// Getting files from TGZ archives
val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
logger.debug("Getting files from archive : "+tgzStream._1)
utils.getFilesFromTgzStream(tgzStream._2)
})
// We run the same process with 3 different "modes"
val modes = Seq("mode1", "mode2", "mode3")
// We cache the RDD before
val nb = filesRDD.cache().count()
logger.debug($nb + " files as input")
modes.map(mode => {
logger.debug("Processing files with mode : " + mode)
myProcessor.process(mode, filesRDD)
})
filesRDD.unpersist() // I tried with or without this
[...]
}
The generated logs are (for example with 3 archives as input) :
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
3 files as input
Processing files with mode : mode1
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
Processing files with mode : mode2
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
Processing files with mode : mode3
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
My Spark configuration :
What i understand from these logs is that the files extraction is performed 4 times instread of one ! This obviously leads me to Heap Space issues and performance leaks...
Am I doing something wrong ?
EDIT : I also tried to use modes.foreach(...)
instead of map but nothing changed...
Okay, after A LOT of tests, I finally resolve this issue. In fact there was 2 problems:
I underestimated the size of input data : Spark's cache
or persist
functions are unefficient if the RDD is too big to be stored completely in 60% of the total memory, I knew it but thought my input data wheren't so big but in fact my RDD was 80GB. But 60% of my memory (which is 160GB) is still more than 80GB, so what happened ? Answer in problem n°2...
My partitons were too big : Somewhere in my code, the number of partitions of my RDD was set to 100, so i had 100 partitons of 1.6GB each. The problem is that my data is composed of strings of dozens of Megs each, so my partitions weren't full and 10GB of used memory were in fact containing only 7 or 8GB of real data.
To solve these problems, I had to use persist(StorageLevel.MEMORY_SER)
which increases computation time but dramatically reduce memory use (according to this benchmark) AND set the partition number to 1000 (according to Spark documentation which recommends partitions of ~128MB)