Search code examples
scalapysparkuser-defined-functionsscalatestspark3

Spark3 Vector UDF vs Regular UDF


What is the difference between udf and vector udf in spark 3 as vectorized udf is new feature as per spark documentation

I know in In Spark 3, a user-defined function (UDF) is a function that you can define in a programming language such as Python or Scala and apply to data in Spark DataFrame or Dataset. A UDF takes one or more columns as input and produces a new column as output.


Solution

  • I experimented with simple unit test case and found that udf is simple on one element but vector udf operates on Array or struct

    A vectorized UDF is a new feature in Spark 3 that is designed to improve the performance of UDFs by allowing them to process multiple rows at once, instead of processing one row at a time.

    A vectorized UDF takes one or more columns as input and produces a new column as output, just like a regular UDF. However, instead of processing each row individually, a vectorized UDF operates on a batch of rows at once, which can result in significant performance improvements.

    Note : not all UDFs can be vectorized. Only UDFs that operate on arrays or structs can be vectorized in Spark 3+

    below is the full example :

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.scalatest.BeforeAndAfterAll
    import org.scalatest.funsuite.AnyFunSuite
    import org.scalatest.matchers.should.Matchers
    
    /**
     * note that not all UDFs can be vectorized. Only UDFs that operate on arrays or structs can be vectorized in Spark 3+.
     */
    class VectorizedUDFsSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll {
      private var spark: SparkSession = _
    
      override def beforeAll(): Unit = {
        spark = SparkSession.builder()
          .appName("VectorizedUDFsSpec")
          .master("local[*]")
          .getOrCreate()
      }
    
      override def afterAll(): Unit = {
        spark.stop()
      }
    
      test("addOne UDF should add one to a number") {
        val spark1 = spark
        import spark1.implicits._
        val addOne = udf((x: Int) => x + 1)
    
        val df: DataFrame = Seq((1), (2), (3)).toDF("num")
        val dfWithAddOne = df.withColumn("num_plus_one", addOne($"num"))
    
        val result = dfWithAddOne.collect()
    
        assert(result.length === 3)
        assert(result(0).getAs[Int]("num_plus_one") === 2)
        assert(result(1).getAs[Int]("num_plus_one") === 3)
        assert(result(2).getAs[Int]("num_plus_one") === 4)
      }
    
      test("addOneVectorized UDF should add one to a column of numbers") {
        val spark1 = spark
        import spark1.implicits._
        val addOneVectorized = udf((xs: Seq[Int]) => xs.map(_ + 1))
    
        val df: DataFrame = Seq((Array(1,2,3)), (Array(2)), (Array(3))).toDF("num")
        val dfWithAddOneVectorized = df.withColumn("num_plus_one", addOneVectorized($"num"))
        dfWithAddOneVectorized.show(false)
        val result = dfWithAddOneVectorized.collect()
    
        assert(result.length === 3)
        assert(result(0).getAs[Seq[Int]]("num_plus_one") === Seq(2,3,4))
        assert(result(1).getAs[Seq[Int]]("num_plus_one") === Seq(3))
        assert(result(2).getAs[Seq[Int]]("num_plus_one") === Seq(4))
      }
    }
    

    output : regular udf

    +---+------------+
    |num|num_plus_one|
    +---+------------+
    |1  |2           |
    |2  |3           |
    |3  |4           |
    +---+------------+
    

    vectorized udf :

    +---------+------------+
    |num      |num_plus_one|
    +---------+------------+
    |[1, 2, 3]|[2, 3, 4]   |
    |[2]      |[3]         |
    |[3]      |[4]         |
    +---------+------------+
    

    Conclusion :

    The key difference between a UDF and a vectorized UDF in Spark 3 is that the latter is a new feature designed to improve the performance of UDFs by processing batches of rows at once instead of processing each row individually.