Search code examples
scalauser-defined-functionsapache-spark-sqludf

Spark SQL UDF returning scala immutable Map with df.WithColumn()


I have case class

case class MyCaseClass(City : String, Extras : Map[String, String])

and user defined function which returns scala.collection.immutable.Map

def extrasUdf = spark.udf.register(
   "extras_udf", 
   (age : Int, name : String) => Map("age" -> age.toString, "name" -> name)
)

but this breaks with Exception:

import spark.implicits._

spark.read.options(...).load(...)
      .select('City, 'Age, 'Name)
      .withColumn("Extras", extrasUdf('Age, 'Name))
      .drop('Age)
      .drop('Name)
      .as[MyCaseClass]

I should use spark sql's MapType(DataTypes.StringType, DataTypes.IntegerType) but I can't find any working example...

And this works if I use scala.collection.Map but I need immutable Map


Solution

  • There are many problems with your code:

    • You are using def extrastUdf =, which creates a function for registering a UDF as opposed to actually creating/registering a UDF. Use val extrasUdf = instead.

    • You are mixing value types in your map (String and Int), which makes the map be Map[String, Any] as Any is the common superclass of String and Int. Spark does not support Any. You can do at least two things: (a) switch to using a string map (with age.toString, in which case you don't need a UDF as you can simply use map()) or (b) switch to using named structs using named_struct() (again, without the need for a UDF). As a rule, only write a UDF if you cannot do what you need to do with the existing functions. I prefer to look at the Hive documentation because the Spark docs are rather sparse.

    • Also, keep in mind that type specification in Spark schema (e.g., MapType) is completely different from Scala types (e.g., Map[_, _]) and separate from how types are represented internally and mapped between Scala & Spark data structures. In other words, this has nothing to do with mutable vs. immutable collections.

    Hope this helps!