Search code examples
apache-sparkapache-spark-sqlquery-optimization

Order of evaluation of predicates in Spark SQL where clause


I am trying to understand the order of predicate evaluation in Spark SQL in order to increase performance of a query.
Let's say I have the following query

"select * from tbl where pred1 and pred2"

and lets say that none of the predicates qualify as pushdown filters (for simplification). Also lets assume that pred1 is computationally much more complex than pred2 (assume regex pattern matching vs negation).

  • Is there any way to verify that spark will evaluate pred2 before pred1?
  • Is this deterministic?
  • Is this controllable?
  • Is there any way to see the final execution plan?

Solution

  • General

    Good question.

    Inferred answer via testing a scenario and making deductions as could not find the suitable docs. 2nd attempt due to all sorts of statements on the web not able to be backed up.

    This question I think is not about AQE Spark 3.x aspects, but it is about say, a dataframe as part of Stage N of a Spark App that has passed the stage of acquiring data from sources at rest, which is subject to filtering with multiple predicates being applied.

    Then the central point is does it matter how the predicates are ordered or does Spark (Catalyst) re-order the predicates to minimize the work to be done?

    • The premise here is that filtering the maximum amount of data out first makes more sense than evaluating a predicate that filters very little out.
      • This is a well-known RDBMS point referring to sargable predicates (subject to evolution of definition over time).
        • A lot of the discussion focused on indexes, Spark, Hive do not have this, but DF's are columnar.

    Point 1

    You can try for %sql

     EXPLAIN EXTENDED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;
    

    From this you can see what's going on if there is re-arranging of predicates, but I saw no such aspects in the Physical Plan in non-AQE mode on Databricks. Refer to https://docs.databricks.com/sql/language-manual/sql-ref-syntax-qry-explain.html.

    Catalyst can re-arrange filtering I read here and there. To what extent, is a lot of research; I was not able to confirm this.

    Also an interesting read: https://www.waitingforcode.com/apache-spark-sql/catalyst-optimizer-in-spark-sql/read

    Point 2

    I ran the following pathetic contrived examples with the same functional query but with predicates reversed, using a column that has high cardinality and tested for a value that does not in fact exist and then compared the count of the accumulator used in an UDF when called.

    Scenario 1

    import org.apache.spark.sql.functions._
    
    def randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1
    def randomInt1to10 = scala.util.Random.nextInt(10)+1
    def randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1
    
    val df = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1)) 
    
    val accumulator = sc.longAccumulator("udf_call_count")
    
    spark.udf.register("myUdf", (x: String) => {accumulator.add(1)
                                                x.length}
                      )  
    
    accumulator.reset()
    df.where("myUdf(text) = 3 and hc = -4").select(max($"text")).show(false)
    println(s"Number of UDF calls ${accumulator.value}")  
    

    returns:

    +---------+
    |max(text)|
    +---------+
    |null     |
    +---------+
    
    Number of UDF calls 1000000 
    

    Scenario 2

    import org.apache.spark.sql.functions._
    
    def randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1
    def randomInt1to10 = scala.util.Random.nextInt(10)+1
    def randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1
    
    val dfA = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1)) 
    
    val accumulator = sc.longAccumulator("udf_call_count")
    
    spark.udf.register("myUdf", (x: String) => {accumulator.add(1)
                                                x.length}
                      )  
    
    accumulator.reset()
    dfA.where("hc = -4 and myUdf(text) = 3").select(max($"text")).show(false)
    println(s"Number of UDF calls ${accumulator.value}")
    

    returns:

    +---------+
    |max(text)|
    +---------+
    |null     |
    +---------+
    
    Number of UDF calls 0
    

    My conclusion here is that:

    • There is left to right evaluation - in this case - as there are 0 calls for the udf as the accumulator value is 0 for scenario 2, as opposed to scenario 1 with 1M calls registered.

    • So, the order of predicate processing as say ORACLE and DB2 may do for Stage 1 predicates does not apply.

    Point 3

    I note from the manual however https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html the following:

    Evaluation order and null checking

    Spark SQL (including SQL and the DataFrame and Dataset APIs) does not guarantee the order of evaluation of subexpressions. In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order. For example, logical AND and OR expressions do not have left-to-right “short-circuiting” semantics.

    Therefore, it is dangerous to rely on the side effects or order of evaluation of Boolean expressions, and the order of WHERE and HAVING clauses, since such expressions and clauses can be reordered during query optimization and planning. Specifically, if a UDF relies on short-circuiting semantics in SQL for null checking, there’s no guarantee that the null check will happen before invoking the UDF. For example,

    spark.udf.register("strlen", (s: String) => s.length)
    spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
    

    This WHERE clause does not guarantee the strlen UDF to be invoked after filtering out nulls.

    To perform proper null checking, we recommend that you do either of the following:

    Make the UDF itself null-aware and do null checking inside the UDF itself Use IF or CASE WHEN expressions to do the null check and invoke the UDF in a conditional branch.

    spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
    spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
    spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok
    

    Slight contradiction.