Search code examples
scalaapache-sparkapache-spark-sqlnested

spark / scala : data type mismatch during nested values update


I have an input dataframe like this :

+---------------------------------------------------------------------+ 
|infos                                                                |
+---------------------------------------------------------------------+ 
|[{100, 1, foo}, {103, 1, bar}, {99, 0, null}]                        | 
|[{101, 1, null}, {102, 1, null}]                                     | 
|[]                                                                   |
+---------------------------------------------------------------------+

with this schema :

root
 |-- Infos: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- val: string (nullable = true)
 |    |    |-- com: string (nullable = true)

And i would like to replace null values with empty string:

  val nullToEmptyString: Row => Row = { row: Row =>
    def recursifUpdate(row: Any): Any = {
      row match {
        case row: Row      => Row.fromSeq(row.toSeq.map(recursifUpdate))
        case seq: Seq[Any] => seq.map(recursifUpdate)
        case null          => ""
        case _             => row
      }
    }
    Row.fromSeq(row.toSeq.map(recursifUpdate))
  }

val outputDataSchema: StructType = StructType(
   StructField("compInfos",
      ArrayType(
         StructType(
            Seq(
               StructField("id", StringType, nullable = true),
               StructField("value", StringType, nullable = true),
               StructField("text", StringType, nullable = true)
            )
         )
      ), nullable = false)
)

val outputDf = spark.createDataFrame(inputDf.rdd.map{nullToEmptyString}, outputDataSchema)
outputDf.show(false)
outputDf.coalesce(1).write.format("parquet").save("/usr/samples/output/")

The show works perfectly :

+---------------------------------------------------------------------+ 
|infos                                                                |
+---------------------------------------------------------------------+ 
|[{100, 1, foo}, {103, 1, bar}, {99, 0, }]                            | 
|[{101, 1, }, {102, 1, }]                                             | 
|[]                                                                   |
+---------------------------------------------------------------------+

but when I try to write the outputDf, I get this data type mismatch error :

Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of array<struct < id:string, val:string, com:string> >

I don't know exactly why. is it a better way to rewrite the nullToEmptyString function in order to catch empty struct type ?


Solution

  • SPECIFIC KEY

    I am not really sure how to answer your question, but I can provide you another way to do this without UDFs which is better;

    Assume your dataset is called main and looks as below

    +---------------------------------+
    |col                              |
    +---------------------------------+
    |[{null, 100, 100}, {foo, 100, 1}]|
    |[{null, 100, 1}, {foo, 100, 2}]  |
    +---------------------------------+
    

    with the following schema

    root
     |-- col: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- com: string (nullable = true)
     |    |    |-- id: string (nullable = true)
     |    |    |-- val: string (nullable = true)
    

    What we can do, is overwrite col by doing a transformation where we replace null with "", otherwise we leave the object as is. The following line help us do that:

    main = main.withColumn("col", expr("transform(col, x -> case when (x.com is null) then named_struct(\"com\", \"\",\"id\",x.id,\"val\",x.val) else x end)"))
    

    To elaborate on this, we are saying if x.com is null, then create a named struct with the same attributes but with a change, overwriting com with "".

    The final output looks as below:

    +-----------------------------+
    |col                          |
    +-----------------------------+
    |[{, 100, 100}, {foo, 100, 1}]|
    |[{, 100, 1}, {foo, 100, 2}]  |
    +-----------------------------+
    

    which is I hope what you want! Good luck!

    MULTIPLE KEYS

    Assume we have this dataset:

    +------------------------------------+
    |col                                 |
    +------------------------------------+
    |[{1, 50, 100}, {2, null, 150}]      |
    |[{null, null, null}, {2, null, 150}]|
    +------------------------------------+
    

    Now, we will create an array of keys that our JSON has, so we can map them to case whens, then concatenate to make one single expression:

    val keys = Array("id", "val", "com").map(key => s"'$key', case when (x.$key is null) then '' else x.$key end").mkString(", ")
    

    Then finally, we can put that into transform as follows:

    df1 = df1.withColumn("correct", expr(s"transform(col, x -> named_struct($keys))"))
    

    and we get:

    +------------------------------------+--------------------------+
    |col                                 |correct                   |
    +------------------------------------+--------------------------+
    |[{1, 50, 100}, {2, null, 150}]      |[{1, 100, 50}, {2, 150, }]|
    |[{null, null, null}, {2, null, 150}]|[{, , }, {2, 150, }]      |
    +------------------------------------+--------------------------+
    

    which is I hope what you need!