Consider the code:
val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from ORCtable")
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)
This is producing the warning WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.
The code fails java.lang.OutOfMemoryError: GC overhead limit exceeded
.
But if I run the following code:
val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from ORCtable limit 2000000")//only difference here
//ORC table has 1651343 rows so doesn't exceed limit 2000000
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)
This produces the correct output. I'm at a loss why this happens and what changes. Can someone help make some sense of this?
To answer my own question: The Spark physical execution plan
are different for the two ways of generating the same dataframe
which can be checked by calling the .explain()
method.
The first way uses the broadcast-hash join
which causes java.lang.OutOfMemoryError: GC overhead limit exceeded
whereas the latter way runs the sort-merge join
which is typically slower but does not strain the garbage collection as much.
This difference in physical execution plans is introduced by the additional filter
operation on the df2 dataframe
.