Search code examples
dataframeapache-sparkapache-spark-sqlparquetcatalyst-optimizer

Is a select after casting a data frame to dataset optimized?


I have the following scenario:

 case class A(name:String,age:Int)
 val df = List(A("s",2)).toDF
 df.write.parquet("filePath")
 val result = spark.read.parquet("filePath").as[A].select("age")

Is the above optimized to select only age ? Upon seeing result.explain I see the following

'Project [unresolvedalias('age, None)]
+- Relation[name#48,age#49] parquet

== Analyzed Logical Plan ==
age: int
Project [age#49]
+- Relation[name#48,age#49] parquet

== Optimized Logical Plan ==
Project [age#49]
+- Relation[name#48,age#49] parquet

== Physical Plan ==
*(1) FileScan parquet [age#49] Batched: true, Format: Parquet, Location:    InMemoryFileIndex[file:/Volumes/Unix/workplace/Reconciliation/src/TFSReconciliationCore/~/Downloa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<age:int>

It seems only age is read. But then what purpose does as serve ? Am I correct in reading the physical plan ?


Solution

  • Yes, you are reading it right. Parquet file has two columns - name and age:

     Relation[name#48,age#49] parquet
    

    But in fact, only age is going to be read:

     Project [age#49]
    

    But then what purpose does as serve?

    For optimizations, like the one above, Spark needs to create an internal schema.

    In some cases, like parquet files, we have a footer that contains metadata with schema, though by default Spark has to read all footers to merge, possibly, different schemas.
    In others (csv, json, etc), if a user doesn't provide a schema, Spark needs to scan the data and create it.

    We also need some generic container which will give us access to the values, and we have one it is called Row.

    Row is a generic row object with an ordered collection of fields that can be accessed by an ordinal / an index (aka generic access by ordinal), a name (aka native primitive access) or using Scala's pattern matching.

    In your example, it is perfectly fine to write the following code:

    spark.read.parquet("filePath").select("age")
    

    Read method returns Dataframe, which in fact just a Dataset of Rows.
    When we use as we are converting Dataset[Row] to Dataset[A] where A could be almost any case-class.

    In my point of view, it makes code cleaner and more readable. It doesn't make much difference when working in SQL-like approach, but when we need to add map/flatMap or custom aggregations into the mix the code will become more understandable.