What is the difference between udf and vector udf in spark 3 as vectorized udf is new feature as per spark documentation
I know in In Spark 3, a user-defined function (UDF) is a function that you can define in a programming language such as Python or Scala and apply to data in Spark DataFrame or Dataset. A UDF takes one or more columns as input and produces a new column as output.
I experimented with simple unit test case and found that udf is simple on one element but vector udf operates on Array or struct
A vectorized UDF is a new feature in Spark 3 that is designed to improve the performance of UDFs by allowing them to process multiple rows at once, instead of processing one row at a time.
A vectorized UDF takes one or more columns as input and produces a new column as output, just like a regular UDF. However, instead of processing each row individually, a vectorized UDF operates on a batch of rows at once, which can result in significant performance improvements.
Note : not all UDFs can be vectorized. Only UDFs that operate on arrays or structs can be vectorized in Spark 3+
below is the full example :
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
/**
* note that not all UDFs can be vectorized. Only UDFs that operate on arrays or structs can be vectorized in Spark 3+.
*/
class VectorizedUDFsSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll {
private var spark: SparkSession = _
override def beforeAll(): Unit = {
spark = SparkSession.builder()
.appName("VectorizedUDFsSpec")
.master("local[*]")
.getOrCreate()
}
override def afterAll(): Unit = {
spark.stop()
}
test("addOne UDF should add one to a number") {
val spark1 = spark
import spark1.implicits._
val addOne = udf((x: Int) => x + 1)
val df: DataFrame = Seq((1), (2), (3)).toDF("num")
val dfWithAddOne = df.withColumn("num_plus_one", addOne($"num"))
val result = dfWithAddOne.collect()
assert(result.length === 3)
assert(result(0).getAs[Int]("num_plus_one") === 2)
assert(result(1).getAs[Int]("num_plus_one") === 3)
assert(result(2).getAs[Int]("num_plus_one") === 4)
}
test("addOneVectorized UDF should add one to a column of numbers") {
val spark1 = spark
import spark1.implicits._
val addOneVectorized = udf((xs: Seq[Int]) => xs.map(_ + 1))
val df: DataFrame = Seq((Array(1,2,3)), (Array(2)), (Array(3))).toDF("num")
val dfWithAddOneVectorized = df.withColumn("num_plus_one", addOneVectorized($"num"))
dfWithAddOneVectorized.show(false)
val result = dfWithAddOneVectorized.collect()
assert(result.length === 3)
assert(result(0).getAs[Seq[Int]]("num_plus_one") === Seq(2,3,4))
assert(result(1).getAs[Seq[Int]]("num_plus_one") === Seq(3))
assert(result(2).getAs[Seq[Int]]("num_plus_one") === Seq(4))
}
}
output : regular udf
+---+------------+
|num|num_plus_one|
+---+------------+
|1 |2 |
|2 |3 |
|3 |4 |
+---+------------+
vectorized udf :
+---------+------------+
|num |num_plus_one|
+---------+------------+
|[1, 2, 3]|[2, 3, 4] |
|[2] |[3] |
|[3] |[4] |
+---------+------------+
Conclusion :
The key difference between a UDF and a vectorized UDF in Spark 3 is that the latter is a new feature designed to improve the performance of UDFs by processing batches of rows at once instead of processing each row individually.