We are using Apache Spark 2.1.1 to generate some daily reports. These reports are generated from some daily data, which we persist before running a report on each unit separately and unioning them all together. Here is a simplified version of what we are doing:
def unitReport(d: Date, df: DataFrame, u: String): DataFrame = ... // Builds a report based on unit `u`
val date: Date = ... // Date to run the report
val dailyData: DataFrame = someDailyData.persist() // Daily data
val units: Seq[String] = Seq("Unit_A", "Unit_B", "Unit_C")
val report: DataFrame =
units.map(unitReport(date, dailyData, _)) // Report for each unit.
.reduce((a, b) => a.union(b)) // Join all the units together.
After this, we write the report to HDFS as a csv, concatenate the parts together, and email the report out.
We've started to have problems with the largest of these reports which runs on about fifty units. We keep upping the max result size (now at 10G) as well as drive memory and keep hitting it. The confusing things here are that a) we aren't ever pulling results back to the driver and b) the final outputted report only takes up 145k and 1298 lines in CSV form, why are we passing 8G of maxResultSize
? We feel like there is something we don't understand about how Spark manages memory, what exactly is included in resultSize
, and what gets sent back to the driver, but have had a hard time finding any explanation or documentation. Here is a snippet of the final stages of the report, right before it starts running out of memory to give you an idea of the complexity of the report:
[Stage 2297:===========================================> (4822 + 412) / 5316]
[Stage 2297:===========================================> (4848 + 394) / 5316]
[Stage 2297:============================================> (4877 + 370) / 5316]
[Stage 2297:============================================> (4909 + 343) / 5316]
[Stage 2297:============================================> (4944 + 311) / 5316]
[Stage 2297:============================================> (4964 + 293) / 5316]
[Stage 2297:============================================> (4980 + 278) / 5316]
[Stage 2297:=============================================> (4996 + 266) / 5316]
[Stage 2297:=============================================> (5018 + 246) / 5316]
We have found what we think to be a similar memory effect with the following code:
import org.apache.spark.mllib.random.RandomRDDs._
val df = normalRDD(sc, 1000000000L, 1000000).toDF()
df.filter($"value" > 0.9).count()
While this code only returns a simple count, when we eventually hit this out of memory error on the driver:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:174)
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
at scala.collection.generic.Growable$class.loop$1(Growable.scala:53)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:57)
When we monitor the logs on the driver, we find that it is doing full garbage collections CONSTANTLY with the overall memory creeping up:
2.095: [GC [PSYoungGen: 64512K->8399K(74752K)] 64512K->8407K(244224K), 0.0289150 secs] [Times: user=0.05 sys=0.02, real=0.02 secs]
3.989: [GC [PSYoungGen: 72911K->10235K(139264K)] 72919K->10709K(308736K), 0.0257280 secs] [Times: user=0.04 sys=0.02, real=0.02 secs]
5.936: [GC [PSYoungGen: 139259K->10231K(139264K)] 139733K->67362K(308736K), 0.0741340 secs] [Times: user=0.40 sys=0.12, real=0.07 secs]
10.842: [GC [PSYoungGen: 139255K->10231K(268288K)] 196386K->86311K(437760K), 0.0678030 secs] [Times: user=0.28 sys=0.07, real=0.07 secs]
19.282: [GC [PSYoungGen: 268279K->10236K(268288K)] 344359K->122829K(437760K), 0.0642890 secs] [Times: user=0.32 sys=0.10, real=0.06 secs]
22.981: [GC [PSYoungGen: 268284K->30989K(289792K)] 380877K->143582K(459264K), 0.0811960 secs] [Times: user=0.20 sys=0.07, real=0.08 secs]
Does anyone have any ideas what is going on? Any explanation or pointers to documentation would be greatly appreciated.
It's hard to know for sure, but I'm guessing this has to do with the total number of partitions in the DataFrame that is the result of the reduction, and that number is potentially larger the more units you have, because the number of partitions in a.union(b)
is the sum of a
and b
's partition count.
While data isn't stored on / sent to Driver, Driver does manage objects representing all the partitions and the tasks assigned to each one of those; If your DataFrame ends up with millions of partitions, Driver will create (and then collect using GC) millions of objects.
So, try changing the union operation to include a coalesce
operation to limit total number of partitions:
val MaxParts = dailyData.rdd.partitions.length * 2 // or anything, but something reasonable
val report: DataFrame =
units.map(unitReport(date, dailyData, _))
.reduce((a, b) => a.union(b).coalesce(MaxParts))