I have a use-case where I want to add another entry into a Map object. Here's the setup(with Scala 2.13.13 + Spark 3.3.1):
val json =
"""
[
{
"info" : {
"1234" : {
"name": "John Smith",
"age": 29,
"gender": "male"
}
}
}
]
"""
val personSchema = new StructType()
.add("name", StringType)
.add("age", IntegerType)
.add("gender", StringType)
val schema = new StructType().add("info", MapType(StringType, personSchema))
val spark = SparkSession.builder()
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val df = spark.read.schema(schema).json(Seq(json).toDS)
df.show(false)
In this example, I have an info
object which has a Map
of String
to a Person Struct
. I want to dynamically generate another Person and add it the info
object. So in JSON, I would end up with:
{
"info": {
"1234": {
"name": "John Smith",
"age": 29,
"gender": "male"
},
"456": {
"name": "Robert Jones",
"age": 35,
"gender": "male"
}
}
}
For this, I added the following UDF:
val addPersonUDF = udf((infoMap: Map[String, Row]) => {
infoMap + ("456" -> new GenericRowWithSchema(Array("Robert Jones", 35, "male"), personSchema))
})
df.select(col("*"), addPersonUDF(col("info"))).show(false)
But I keep getting the error:
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported
at org.apache.spark.sql.errors.QueryExecutionErrors$.schemaForTypeUnsupportedError(QueryExecutionErrors.scala:1193)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:802)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:744)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714)
at org.apache.spark.sql.functions$.$anonfun$udf$6(functions.scala:5124)
at scala.Option.getOrElse(Option.scala:201)
at org.apache.spark.sql.functions$.udf(functions.scala:5124)
What am I doing wrong?
Use a Scala case class for the person's data, so that Spark can infer the schema automatically. This way you can avoid using a generic row.
case class Person(name:String, age: Integer, gender: String)
val schema = new StructType().add("info",
ScalaReflection.schemaFor[Map[String,Person]].dataType)
val df = spark.read.schema(schema).json(Seq(json).toDS)
val addPersonUDF = udf((infoMap: Map[String, Person]) => {
infoMap + ("456" -> Person("Robert Jones", 35, "male"))
})
df.select(col("*"), addPersonUDF(col("info"))).show(false)
Output:
+--------------------------------+-----------------------------------------------------------------+
|info |UDF(info) |
+--------------------------------+-----------------------------------------------------------------+
|{1234 -> {John Smith, 29, male}}|{1234 -> {John Smith, 29, male}, 456 -> {Robert Jones, 35, male}}|
+--------------------------------+-----------------------------------------------------------------+