Search code examples
scalaapache-sparkdataframeindexingudf

spark dataframe udf mapping indices to values


I have a spark data frame where one column consists of indices of a list. I would like to write a udf that allows me to create a new column with the values associated with the indices.

E.g.

Suppose I have the following dataframe and array:

val df = spark.createDataFrame(Seq((0, Array(1, 1, 2)), (1, Array(1, 2, 0))))
df.show()
+---+---------+
| _1|       _2|
+---+---------+
|  0|[1, 1, 2]|
|  1|[1, 2, 0]|
+---+---------+
val sArray = Array("a", "b", "c")

I would like to be able to map the indicies in _2 to their values in sArray leading to this:

+---+---------+---------+
| _1|       _2|       _3|
+---+---------+---------+
|  0|[1, 1, 2]|[b, b, c]|
|  1|[1, 2, 0]|[b, c, a]|
+---+---------+---------+

I have been trying to do this with a udf:

def indexer (values: Array[String]) = 
  udf((indices: Array[Int]) => indices.map(values(_)))
df.withColumn("_3", indexer(sArray)($"_2"))

However when I do this, I get the following error:

Failed to execute user defined function

... Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I

What is going wrong here? How can I fix this?


Solution

  • When operating on an ArrayType column in a DataFrame, the actual type passed into a UDF is mutable.WrappedArray. The failure you see is the result of trying to cast this WrappedArray into the Array[Int] your function expects.

    The fix is rather simple - define the function to expect a mutable.WrappedArray[Int]:

    def indexer (values: Array[String]): UserDefinedFunction = {
      udf((indices: mutable.WrappedArray[Int]) => indices.map(values(_)))
    }