Search code examples
dataframeapache-sparkdatasetscala-2.12

Spark struct to case class conversion issues with getAs[T]


I often use the map function on spark Dataset rows to do transformations in Scala on typed objects. My usual pattern is to convert intermediate results created from data frame transformations (withColumn, groupBy, etc.) and create a typed Dataset of the intermediate result so I can use map.

This works well but leads to a lot of 'temporary' case classes for intermediate results or unwieldy tuple types.

An alternative would be to run map on a data frame and retrieve typed fields from the row using getAs[T] but this doesn't seem to work with spark.implicits if T is a case class.

E.g. this gives the error ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window

import spark.implicits._

final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS

val df = people.alias("p")
               .select($"p.name", struct($"p.*").alias("person"))
val ds = df.map(row => {
  val name = row.getAs[String]("name")
  val person = row.getAs[Person]("person")
  (name, person)
})

display(ds)

whereas this works fine:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window

import spark.implicits._

final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS

val df = people.alias("p")
               .select($"p.name", struct($"p.*").alias("person"))
               .as[Tuple2[String, Person]]
val ds = df.map(row => {
  val name = row._1
  val person = row._2
  (name, person)
})

display(ds)

So spark is happily converting the dataframe person struct to the Person case class in the second example but won't do it in the first example. Does anyone know a simple way to fix this?

Thanks,

David


Solution

  • "Simple", possibly :), but very much using internal api's that are subject to change (and have done). This code won't work as-is on Spark 4 either (tested on 3.5.1).

    As an approach it's also likely slower than the second example you provide using tuples as the Spark code translates from InternalRow to the user land Row before entering your map code. The below code then converts back to InternalRow before calling the decoder.

    resolveAndBind is typically ok in this kind of example but it's also not guaranteed to work in all cases as resolution of field names etc. typically needs to happen as part of the full analysis of the query plan.

    import org.apache.spark._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
    
    import spark.implicits._
    
    implicit val pEnc = implicitly[Encoder[Person]].asInstanceOf[ExpressionEncoder[Person]]
    val decoder = pEnc.resolveAndBind().objDeserializer
    
    val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS
    
    val df = people.alias("p")
      .select($"p.name", struct($"p.*").alias("person"))
    val ds = df.map(row => {
      val name = row.getAs[String]("name")
      val personRow = row.getAs[Row]("person")
      val person = decoder.eval(CatalystTypeConverters.convertToCatalyst(personRow).asInstanceOf[InternalRow]).asInstanceOf[Person]
      (name, person)
    })
    
    ds.show
    

    in summary, you are better off using a tuple wrapper and the inbuilt encoding wherever possible, it's faster and is designed and tested to work that way.