Search code examples
scalaapache-sparkapache-spark-sqlscala-option

Spark 2 Datasets of Options


I have a Dataset of Strings that I parse into a Dataset of a case class using a function that can fail ( for instance if the data I try to parse is not usable). For that reason that function returns an Option (Scala). So I end up with a Dataset of Option[MyCaseClass].

Spark seem to accept that Dataset and process it but instead of returning a None if a parsing fails it instead returns me a Some(MyCaseClass(null, null...)) .

Here is an example of code doing just that :

recordsDs
  .map { record =>
    val maybeArticle = unmarshallArticle(record)
    if (maybeArticle.isEmpty) {
      println(s"Could not parse record $record into an article.")
    }
    maybeArticle
  }
  .filter(_.isDefined)
  .map(_.get)
  .collect().toList // Always returns a List(Some(Article(null, null), Some(Article...

And here is a notebook illustrating the case https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4480125715694487/1289561535151709/7956941984681624/latest.html

My guess is that when serializing then deserializing the Option value Spark uses the Some() constructor instead of checking if the Option is a Some or a None.

I could obviously create a wrapper around my object, something like MaybeArticle(article: Option[Article]) , but I would like to know if Spark can handle datasets of Options properly ?


Solution

  • I think the solution would be to use a flatMap. Here is a really silly example:

    scala> val ds = Seq(("a1"), ("a2"), ("a4"), ("b1"), ("b2")).toDS
    ds: org.apache.spark.sql.Dataset[String] = [value: string]
    
    scala> ds.show 
    +-----+        
    |value|        
    +-----+        
    |   a1|        
    |   a2|        
    |   a4|        
    |   b1|        
    |   b2|        
    +-----+        
    
    scala> val ds2 = ds.flatMap{x => if (x.contains("a")) Some(x) else None}
    ds2: org.apache.spark.sql.Dataset[String] = [value: string]
    
    scala> ds2.show
    +-----+
    |value|
    +-----+
    |   a1|
    |   a2|
    |   a4|
    +-----+
    

    The reason this works is because Some and None act like collections which can be unpacked using flatMap (where None elements are just ommitted).