I have a spark data frame where one column consists of indices of a list. I would like to write a udf that allows me to create a new column with the values associated with the indices.
E.g.
Suppose I have the following dataframe and array:
val df = spark.createDataFrame(Seq((0, Array(1, 1, 2)), (1, Array(1, 2, 0))))
df.show()
+---+---------+
| _1| _2|
+---+---------+
| 0|[1, 1, 2]|
| 1|[1, 2, 0]|
+---+---------+
val sArray = Array("a", "b", "c")
I would like to be able to map the indicies in _2
to their values in sArray
leading to this:
+---+---------+---------+
| _1| _2| _3|
+---+---------+---------+
| 0|[1, 1, 2]|[b, b, c]|
| 1|[1, 2, 0]|[b, c, a]|
+---+---------+---------+
I have been trying to do this with a udf:
def indexer (values: Array[String]) =
udf((indices: Array[Int]) => indices.map(values(_)))
df.withColumn("_3", indexer(sArray)($"_2"))
However when I do this, I get the following error:
Failed to execute user defined function
... Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I
What is going wrong here? How can I fix this?
When operating on an ArrayType
column in a DataFrame, the actual type passed into a UDF is mutable.WrappedArray
. The failure you see is the result of trying to cast this WrappedArray
into the Array[Int]
your function expects.
The fix is rather simple - define the function to expect a mutable.WrappedArray[Int]
:
def indexer (values: Array[String]): UserDefinedFunction = {
udf((indices: mutable.WrappedArray[Int]) => indices.map(values(_)))
}