Search code examples
scalaapache-sparkdataframeorc

Weird behavior of DataFrame operations


Consider the code:

val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from  ORCtable")
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
  trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)

This is producing the warning WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.The code fails java.lang.OutOfMemoryError: GC overhead limit exceeded.

But if I run the following code:

val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from  ORCtable limit 2000000")//only difference here
//ORC table has 1651343 rows so doesn't exceed limit 2000000
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
  trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)

This produces the correct output. I'm at a loss why this happens and what changes. Can someone help make some sense of this?


Solution

  • To answer my own question: The Spark physical execution plan are different for the two ways of generating the same dataframe which can be checked by calling the .explain() method.

    The first way uses the broadcast-hash join which causes java.lang.OutOfMemoryError: GC overhead limit exceeded whereas the latter way runs the sort-merge join which is typically slower but does not strain the garbage collection as much.

    This difference in physical execution plans is introduced by the additional filteroperation on the df2 dataframe.