Search code examples
scalaapache-sparkapache-spark-sqlavrospark-avro

Explode Spark Daraframe Avro Map into flat format


I am using Spark Shell v_1.6.1.5.

I have the following Spark Scala Dataframe:

val data = sqlContext.read.avro("/my/location/*.avro")
data.printSchema
root
 |-- id: long (nullable = true)
 |-- stuff: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = false)
 |    |    |-- created: long (nullable = false)
 |    |    |-- lastModified: long (nullable = false)
 |    |    |-- . . .

What's the exact syntax to 'explode it' into the following flat format (discarding possible null values): [id, key, value] ?


Solution

  • This can be accomplished using a udf and explode.

    However we do not know the class of the values in the map because this information is inferred and is not available as an explicit class. In order to overcome this issue we can "shadow" the inferred class by making a case class with an identical class signature. Spark then treats these classes identically because both the inferred class and our shadow class are converted into an identical StructType

    Here is an example (the case class value is a standin for the inferred class that we do not know).

    scala> case class value(created: Long, lastModified: Long)
    defined class value
    
    scala> val myDF = Seq((1, Map("a" -> value(1L,2L), "b" -> value(3L,4L))), (2, Map("c" -> value(5L,6L), "d" -> value(6L,7L)))).toDF("id", "stuff")
    myDF: org.apache.spark.sql.DataFrame = [id: int, stuff: map<string,struct<created:bigint,lastModified:bigint>>]
    
    scala> myDF.show
    +---+--------------------+
    | id|               stuff|
    +---+--------------------+
    |  1|Map(a -> [1,2], b...|
    |  2|Map(c -> [5,6], d...|
    +---+--------------------+
    
    
    scala> myDF.printSchema
    root
     |-- id: integer (nullable = false)
     |-- stuff: map (nullable = true)
     |    |-- key: string
     |    |-- value: struct (valueContainsNull = true)
     |    |    |-- created: long (nullable = false)
     |    |    |-- lastModified: long (nullable = false)
    
    
    scala> case class shadowOfValue(created: Long, lastModified: Long)
    defined class shadowOfValue
    
    scala> val explodeUDF = udf( (map: Map[String, shadowOfValue]) => map.toVector)
    explodeUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StructType(StructField(_1,StringType,true), StructField(_2,StructType(StructField(created,LongType,false), StructField(lastModified,LongType,false)),true)),true),Some(List(MapType(StringType,StructType(StructField(created,LongType,false), StructField(lastModified,LongType,false)),true))))
    
    scala> var newDF = myDF.withColumn("TMP", explode(explodeUDF($"stuff"))).drop("stuff")
    newDF: org.apache.spark.sql.DataFrame = [id: int, TMP: struct<_1: string, _2: struct<created: bigint, lastModified: bigint>>]
    
    scala> newDF = newDF.withColumn("key", $"TMP".apply("_1")).withColumn("value", $"TMP".apply("_2"))
    newDF: org.apache.spark.sql.DataFrame = [id: int, TMP: struct<_1: string, _2: struct<created: bigint, lastModified: bigint>> ... 2 more fields]
    
    scala> newDF = newDF.drop("TMP")
    newDF: org.apache.spark.sql.DataFrame = [id: int, key: string ... 1 more field]
    
    scala> newDF.show
    +---+---+-----+
    | id|key|value|
    +---+---+-----+
    |  1|  a|[1,2]|
    |  1|  b|[3,4]|
    |  2|  c|[5,6]|
    |  2|  d|[6,7]|
    +---+---+-----+
    
    
    scala> newDF.printSchema
    root
     |-- id: integer (nullable = false)
     |-- key: string (nullable = true)
     |-- value: struct (nullable = true)
     |    |-- created: long (nullable = false)
     |    |-- lastModified: long (nullable = false)