Search code examples
sqlscalaapache-sparkudf

How to create a UDF to find index in an array column


I have a table as below:

val question = sqlContext.createDataFrame(Seq((1, Seq("d11","d12","d13")), (2, Seq("d21", "d22", "")))).toDF("Id", "Dates")
+---+---------------+
| Id|          Dates|
+---+---------------+
|  1|[d11, d12, d13]|
|  2|   [d21, d22, ]|
+---+---------------+

The "Dates" Column contains an array of strings. I am wanting to create an udf that can return the index if the array contains the target string. I tried to write an udf like this:

def indexOf(s: String) = udf((n: Array[String]) => if (n.contains(s)) 
n.indexOf(s) else -1)

question.withColumn("index", indexOf("d11")(question("Dates"))).show()

However, I got a err msg like this:

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$indexOf$1: (array<string>) => int)

Is something going horribly wrong here?

Updates: I also found there a err msg like this:

Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String;

So I modified my udf as :

def indexOf(s: String) = udf((n: Seq[String]) => if (n.contains(s)) n.indexOf(s) else -1)

Changed the "Array[String]" to "Seq[String]", and it is working now~ Hi, Nader Hadji Ghanbari, thanks for your suggestion~


Solution

  • In Spark, the Array is represented as a WrappedArray which is similar to an array with a Wrapper. To get worked you can change the signature to Seq, WrappedArray or List.

    def indexOf(s: String) = udf((n: Seq[String]) => 
        if (n.contains(s)) n.indexOf(s) else -1)
    

    Or

    def indexOf(s: String) = udf((n: WrappedArray[String]) => 
        if (n.contains(s)) n.indexOf(s) else -1)
    

    Or

    def indexOf(s: String) = udf((n: List[String]) => 
        if (n.contains(s)) n.indexOf(s) else -1)
    

    Hope this helps!