Search code examples
apache-sparkapache-spark-sqlcatalyst-optimizer

Which optimizations do UDFs not benefit from?


Spark UDF's contain the following functions: nullable, deterministic, dataType, etc. So according to this information, it would benefit from optimizations such as ConstantFolding. Which other optimizations does it benefit from and which optimizations can it not benefit from? I ask this because many presentations present UDFs as a black box which does not benefit from catalyst optimizations, but clearly, it benefits from ConstantFolding.


Solution

  • Spark handles UDF's by wrapping them inside of a class. For example when you write the following:

    val example = udf((a: Int) => a * 2)
    

    What the udf function does is create a UserDefinedFunction class which in its apply function creates a ScalaUDF. ScalaUDF extends Expression, and in its doCodeGen method it does the following:

    ...
        val callFunc =
          s"""
             |$boxedType $resultTerm = null;
             |try {
             |  $resultTerm = ($boxedType)$resultConverter.apply($getFuncResult);
             |} catch (Exception e) {
             |  throw new org.apache.spark.SparkException($errorMsgTerm, e);
             |}
           """.stripMargin
    
        ev.copy(code =
          code"""
             |$evalCode
             |${initArgs.mkString("\n")}
             |$callFunc
    ...
    

    This function converts the DataType of the column/expression to a Scala type (because your UDF operates on scala types), and then it calls your lambda. The deterministic, nullable, and dataTypes are functions of the wrapper of the user-defined function because it extends Expression, not your function. If you want to fully benefit from them, you would have to write a custom Expression which extends Expression or one of its sub-classes.

    Take the following as an example:

    val redundantUdf = udf((a: Long) => true)
    someDf.filter(redundantUdf(someDf("col1"))).explain()
    

    The optimized logical plan would look something like this:

    Project [_1#5736 AS Type#5739, _2#5737L AS sts#5740L]
     +- Filter UDF(_2#5737L)
      +- LocalRelation [_1#5736, _2#5737L]
    

    As you can see it is doing the filter even though it is redundant and will always evaluate to true.

    Whereas the following:

    someDf.filter(expr("true")).explain()
    

    would give the following optimized logical plan:

    LocalRelation [Type#5739, sts#5740L]
    

    It prunes out the filter using PruneFilter rule.

    This doesn't mean all optimizations are excluded, there are optimizations which still work with UDFs such as CombineFilter which combines the expression from two filters for example:

    == Analyzed Logical Plan ==
    _1: string, _2: string
    Filter UDF(_1#2)
    +- Filter UDF(_1#2)
       +- LocalRelation [_1#2, _2#3]
    
    == Optimized Logical Plan ==
    Filter (UDF(_1#2) && UDF(_1#2))
    +- LocalRelation [_1#2, _2#3]
    

    This optimization works because it is only dependent on the deterministic field and UDFs are deterministic by default. So UDFs will benefit from simple optimizations that aren't dependent on the function it wraps. This is because it is in a format which catalyst doesn't understand, catalyst operates on Trees, and your closure is a Scala function. There are other places where UDFs lose out such as specifying the java code generated and spark type information.