I am using Spark Shell v_1.6.1.5.
I have the following Spark Scala Dataframe:
val data = sqlContext.read.avro("/my/location/*.avro")
data.printSchema
root
|-- id: long (nullable = true)
|-- stuff: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = false)
| | |-- created: long (nullable = false)
| | |-- lastModified: long (nullable = false)
| | |-- . . .
What's the exact syntax to 'explode it' into the following flat format (discarding possible null values): [id, key, value]
?
This can be accomplished using a udf
and explode
.
However we do not know the class of the values in the map because this information is inferred and is not available as an explicit class. In order to overcome this issue we can "shadow" the inferred class by making a case class with an identical class signature. Spark then treats these classes identically because both the inferred class and our shadow class are converted into an identical StructType
Here is an example (the case class value
is a standin for the inferred class that we do not know).
scala> case class value(created: Long, lastModified: Long)
defined class value
scala> val myDF = Seq((1, Map("a" -> value(1L,2L), "b" -> value(3L,4L))), (2, Map("c" -> value(5L,6L), "d" -> value(6L,7L)))).toDF("id", "stuff")
myDF: org.apache.spark.sql.DataFrame = [id: int, stuff: map<string,struct<created:bigint,lastModified:bigint>>]
scala> myDF.show
+---+--------------------+
| id| stuff|
+---+--------------------+
| 1|Map(a -> [1,2], b...|
| 2|Map(c -> [5,6], d...|
+---+--------------------+
scala> myDF.printSchema
root
|-- id: integer (nullable = false)
|-- stuff: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- created: long (nullable = false)
| | |-- lastModified: long (nullable = false)
scala> case class shadowOfValue(created: Long, lastModified: Long)
defined class shadowOfValue
scala> val explodeUDF = udf( (map: Map[String, shadowOfValue]) => map.toVector)
explodeUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StructType(StructField(_1,StringType,true), StructField(_2,StructType(StructField(created,LongType,false), StructField(lastModified,LongType,false)),true)),true),Some(List(MapType(StringType,StructType(StructField(created,LongType,false), StructField(lastModified,LongType,false)),true))))
scala> var newDF = myDF.withColumn("TMP", explode(explodeUDF($"stuff"))).drop("stuff")
newDF: org.apache.spark.sql.DataFrame = [id: int, TMP: struct<_1: string, _2: struct<created: bigint, lastModified: bigint>>]
scala> newDF = newDF.withColumn("key", $"TMP".apply("_1")).withColumn("value", $"TMP".apply("_2"))
newDF: org.apache.spark.sql.DataFrame = [id: int, TMP: struct<_1: string, _2: struct<created: bigint, lastModified: bigint>> ... 2 more fields]
scala> newDF = newDF.drop("TMP")
newDF: org.apache.spark.sql.DataFrame = [id: int, key: string ... 1 more field]
scala> newDF.show
+---+---+-----+
| id|key|value|
+---+---+-----+
| 1| a|[1,2]|
| 1| b|[3,4]|
| 2| c|[5,6]|
| 2| d|[6,7]|
+---+---+-----+
scala> newDF.printSchema
root
|-- id: integer (nullable = false)
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- created: long (nullable = false)
| |-- lastModified: long (nullable = false)