I try to apply join for two datasets coming from two big text files. Both text files contains two columns as below:
*col1* *document*
abc 1
aab 1
... ...
ccd 2
abc 2
... ...
I join these two files based on their first columns and try to find how many common col1 values there are for documents. Both text files have a size of 10 gb. When I run my script spark creates 6 stages each of which having 287 partitions. Of those 6 stages are 4 distinct, one foreach and one map. Everything goes well until mapping stage which is 5th stage. On that stage spark stops processing partitions instead it spills on the disk and after ten thousand times spilling it gives an error related to no enough disk space.
I have 4 cores and 8 gb ram. I gave all the memory with -Xmx8g. Also i tried set("spark.shuffle.spill", "true").
My script:
{
...
val conf = new SparkConf().setAppName("ngram_app").setMaster("local[4]").set("spark.shuffle.spill", "false")
val sc = new SparkContext(conf)
val emp = sc.textFile("...doc1.txt").map { line => val parts = line.split("\t")
((parts(5)),parts(0))
}
val emp_new = sc.textFile("...doc2.txt").map { line => val parts = line.split("\t")
((parts(3)),parts(1))
}
val finalemp = emp_new.join(emp).
map { case((nk1) ,((parts1), (val1))) => (parts1 + "-" + val1, 1)}.reduceByKey((a, b) => a + b)
finalemp.foreach(println)
}
What should I do to avoid that much spilling?
It looks like you need to change the memory setting for Spark. If you use the spark-submit script, you simply add -executor-memory 8G
to your command. Setting -Xmx8g
affects the JVM, but not Spark (which I believe defaults to 256MB).
Note, that the rule of thumb says that you should not assign more than 75% of available memory to the Spark job.