Search code examples
apache-sparkdataframeapache-spark-sql

Accessing nested data in spark


I have a collection of nested case classes. I've got a job that generates a dataset using these case classes, and writes the output to parquet.

I was pretty annoyed to discover that I have to manually do a load of faffing around to load and convert this data back to case classes to work with it in subsequent jobs. Anyway, that's what I'm now trying to do.

My case classes are like:

case class Person(userId: String, tech: Option[Tech])
case class Tech(browsers: Seq[Browser], platforms: Seq[Platform])
case class Browser(family: String, version: Int)

So I'm loading my parquet data. I can get the tech data as a Row with:

val df = sqlContext.load("part-r-00716.gz.parquet")
val x = df.head
val tech = x.getStruct(x.fieldIndex("tech"))

But now I can't find how to actually iterate over the browsers. If I try val browsers = tech.getStruct(tech.fieldIndex("browsers")) I get an exception:

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.sql.Row

How can I iterate over my nested browser data using spark 1.5.2?

Update In fact, my case classes contain optional values, so Browser actually is:

case class Browser(family: String,
   major: Option[String] = None, 
   minor: Option[String] = None,
   patch: Option[String] = None, 
   language: String,
   timesSeen: Long = 1,
   firstSeenAt: Long,
   lastSeenAt: Long)

I also have similar for Os:

case class Os(family: String,
    major: Option[String] = None,
    minor: Option[String] = None,
    patch: Option[String] = None,
    patchMinor: Option[String],
    override val timesSeen: Long = 1,
    override val firstSeenAt: Long,
    override val lastSeenAt: Long)

And so Tech is really:

case class Technographic(browsers: Seq[Browser], 
    devices: Seq[Device],
    oss: Seq[Os])

Now, given the fact that some values are optional, I need a solution that will allow me to reconstruct my case classes correctly. The current solution doesn't support None values, so for example given the input data:

Tech(browsers=Seq(
    Browser(family=Some("IE"), major=Some(7), language=Some("en"), timesSeen=3),
    Browser(family=None, major=None, language=Some("en-us"), timesSeen=1),
    Browser(family=Some("Firefox), major=None, language=None, timesSeen=1)
  )
)

I need it to load the data as follows:

family=IE, major=7, language=en, timesSeen=3,
family=None, major=None, language=en-us, timesSeen=1,
family=Firefox, major=None, language=None, timesSeen=1

Because the current solution doesn't support None values, it in fact has an arbitrary number of values per list item, i.e.:

browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]
timesSeen = [3, 1, 1]

As you can see, there's no way of converting the final data (returned by spark) into the case classes that generated it.

How can I work around this insanity?


Solution

  • Some examples

    // Select two columns
    df.select("userId", "tech.browsers").show()
    
    // Select the nested values only
    df.select("tech.browsers").show(truncate = false)
    +-------------------------+
    |browsers                 |
    +-------------------------+
    |[[Firefox,4], [Chrome,2]]|
    |[[Firefox,4], [Chrome,2]]|
    |[[IE,25]]                |
    |[]                       |
    |null                     |
    +-------------------------+
    
    // Extract the family (nested value)
    // This way you can iterate over the persons, and get their browsers
    // Family values are nested
    df.select("tech.browsers.family").show()
    +-----------------+
    |           family|
    +-----------------+
    |[Firefox, Chrome]|
    |[Firefox, Chrome]|
    |             [IE]|
    |               []|
    |             null|
    +-----------------+
    
    // Normalize the family: One row for each family
    // Then you can iterate over all families
    // Family values are un-nested, empty values/null/None are handled by explode()
    df.select(explode(col("tech.browsers.family")).alias("family")).show()
    +-------+
    | family|
    +-------+
    |Firefox|
    | Chrome|
    |Firefox|
    | Chrome|
    |     IE|
    +-------+
    

    Based on the last example:

    val families = df.select(explode(col("tech.browsers.family")))
      .map(r => r.getString(0)).distinct().collect().toList
    println(families)
    

    gives the unique list of browers in a "normal" local Scala list:

    List(IE, Firefox, Chrome)