Search code examples
apache-sparklazy-evaluationbloom-filter

BloomFilter mergeInPlace() producing unexpected behavior


The Spark Scala code snippet below reproduces the behavior I'm trying to understand. At a high level, we construct two tuples each containing a DF and a Bloom filter of the id column of the respective DF. Then we filter b such that any rows with IDs that are contained in a row in a are removed, and store the union of this filtered result and a as c.

val a_df = Seq(("b", List("4", "16"))).toDF("id", "data")
val a_bloom_filter = a_df.stat.bloomFilter(col("id"), 2, 0.000001)
val a = (a_df, a_bloom_filter)

println("a")
a_df.show(20, false)

val b_df = Seq(("b", List("4")), ("c", List("4"))).toDF("id", "data")
val b_bloom_filter = b_df.stat.bloomFilter(col("id"), 2, 0.000001)
val b = (b_df, b_bloom_filter)

println("b")
b_df.show(20, false)

val a_bloom_filter_udf = udf((s: String) => !a._2.mightContain(s))
val filtered_b_df = b._1.filter(a_bloom_filter_udf(col("id")))

val c = a._1.union(filtered_b_df)

println("c")
c.show(20, false)

val merged_bloom_filter = a._2.mergeInPlace(b._2)

println("c")
c.show(20, false)

Running this in the Spark REPL produces output I do not understand:

a
+---+-------+
|id |data   |
+---+-------+
|b  |[4, 16]|
+---+-------+

b
+---+-------+
|id |data   |
+---+-------+
|b  |[4, 16]|
+---+-------+

c
+---+-------+
|id |data   |
+---+-------+
|b  |[4, 16]|
|c  |[4]    |
+---+-------+

c
+---+-------+
|id |data   |
+---+-------+
|b  |[4, 16]|
+---+-------+

Specifically, why does c seem to change when we perform the mergeInPlace operation? My expectation is that c would not change between calls to show.

Reading the documentation for mergeInPlace, I see that "The mutations happen to this instance.", which is a._2, or a's Bloom filter in this case.

My current hypothesis is that after the mergeInPlace call, a._2 gets mutated, so a._2 := a._2 OR b._2 , or effectively (b, c) in this example. Then for the second show(), it seems like everything gets re-evaluated, i.e. a._2 = (b, c), so

  • the UDF is re-evaluated with a._2 = (b, c)
  • filtered_b_df is re-evaluated to an empty DF because both of b._1's IDs are found in the "new" Bloom filter a._2
  • c is re-evaluated as the union between a._1 and filtered_b_df (an empty DF), so effectively just a._1, which is exactly what we see from the second show()

This belief is seemingly reinforced by the fact that b._2.mergeInPlace(a._2) instead of a._2.mergeInPlace(b._2) produces the expected behavior (i.e. c does not change) because a._2 is not clobbered which triggers some sort of re-evaluation.


Solution

  • Yep. You answered it yourself, even though you find it odd. show() is an Action that (re-) triggers the execution path.

    The reevaluation occurs ... explanation for val merged_bloom_filter = a._2.mergeInPlace(b._2) is correct. But not for the reversing of this. It is just that the reverse situation i.e. logic for mergeInPlace is not actually used in your code on re-execution, e.g b._2.

    So, if seeking confirmation, then it is confirmed. And there is no unexpected behaviour.