Search code examples
scalaapache-sparkuser-defined-functions

How to return a Row from Spark UDF?


I have a use-case where I want to add another entry into a Map object. Here's the setup(with Scala 2.13.13 + Spark 3.3.1):

    val json =
      """
         [
           {
              "info" : {
                "1234" : {
                  "name": "John Smith",
                  "age": 29,
                  "gender": "male"
                }
              }
           }
         ]
      """

    val personSchema = new StructType()
      .add("name", StringType)
      .add("age", IntegerType)
      .add("gender", StringType)
    val schema = new StructType().add("info", MapType(StringType, personSchema))

    val spark = SparkSession.builder()
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    import spark.implicits._
    val df = spark.read.schema(schema).json(Seq(json).toDS)
    df.show(false)

In this example, I have an info object which has a Map of String to a Person Struct. I want to dynamically generate another Person and add it the info object. So in JSON, I would end up with:

{
  "info": {
    "1234": {
      "name": "John Smith",
      "age": 29,
      "gender": "male"
    },
    "456": {
      "name": "Robert Jones",
      "age": 35,
      "gender": "male"
    }
  }
}

For this, I added the following UDF:

    val addPersonUDF = udf((infoMap: Map[String, Row]) => {
      infoMap + ("456" -> new GenericRowWithSchema(Array("Robert Jones", 35, "male"), personSchema))
    })
    df.select(col("*"), addPersonUDF(col("info"))).show(false)

But I keep getting the error:

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported
    at org.apache.spark.sql.errors.QueryExecutionErrors$.schemaForTypeUnsupportedError(QueryExecutionErrors.scala:1193)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:802)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:744)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714)
    at org.apache.spark.sql.functions$.$anonfun$udf$6(functions.scala:5124)
    at scala.Option.getOrElse(Option.scala:201)
    at org.apache.spark.sql.functions$.udf(functions.scala:5124)

What am I doing wrong?


Solution

  • Use a Scala case class for the person's data, so that Spark can infer the schema automatically. This way you can avoid using a generic row.

    case class Person(name:String, age: Integer, gender: String)
    
    val schema = new StructType().add("info", 
                 ScalaReflection.schemaFor[Map[String,Person]].dataType)
    val df = spark.read.schema(schema).json(Seq(json).toDS)
    
    val addPersonUDF = udf((infoMap: Map[String, Person]) => {
      infoMap + ("456" -> Person("Robert Jones", 35, "male"))
    })
    
    df.select(col("*"), addPersonUDF(col("info"))).show(false)
    

    Output:

    +--------------------------------+-----------------------------------------------------------------+
    |info                            |UDF(info)                                                        |
    +--------------------------------+-----------------------------------------------------------------+
    |{1234 -> {John Smith, 29, male}}|{1234 -> {John Smith, 29, male}, 456 -> {Robert Jones, 35, male}}|
    +--------------------------------+-----------------------------------------------------------------+