This code is good, it only reads the column i
(notice the last line ReadSchema: struct<i:bigint>
, which only reads i
):
import org.apache.spark.sql.Dataset
// Define the case class
case class Foo(i: Long, j: String)
// Create a Dataset of Foo
val ds: Dataset[Foo] = spark.createDataset(Seq(
Foo(1, "Q"),
Foo(10, "W"),
Foo(100, "E")
))
// Filter and cast the column
val result = ds.filter($"i" === 2).select($"i")
// Explain the query plan
result.explain()
// It prints:
//== Physical Plan ==
//*(1) Filter (isnotnull(i#225L) AND (i#225L = 2))
//+- *(1) ColumnarToRow
// +- FileScan parquet [i#225L] Batched: true, DataFilters: [isnotnull(i#225L), (i#225L = 2)], //Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], //PushedFilters: [IsNotNull(i), EqualTo(i,2)], ReadSchema: struct<i:bigint>
However, if i use val result = ds.filter(_.i == 10).map(_.i)
, the physical plan will read all columns including j
(notice the last line ReadSchema: struct<i:bigint,j:string>
):
//= Physical Plan ==
//*(1) SerializeFromObject [input[0, bigint, false] AS value#336L]
//+- *(1) MapElements //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8811/2079//839768@1028cff, obj#335: bigint
// +- *(1) Filter //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8810/7014//[email protected]
// +- *(1) DeserializeToObject newInstance(class //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo), obj#334: //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo
// +- *(1) ColumnarToRow
// +- FileScan parquet [i#225L,j#226] Batched: true, DataFilters: [], Format: Parquet, //Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], PushedFilters: [], //ReadSchema: struct<i:bigint,j:string>
Why does spark handle differently when i use the scala syntax _.i
inside filter?
_ forces collection, tuple processing like rdd due to scala map
. You see ColumnarToRow
& DeserilaizeToObject
in physical plan.
Catalyst has little insight into Scala code and this is an old aspect if you google. ds.map(_.x)
is same as col("x")
but Spark Catalyst still does nothing in this regard.
See https://issues.apache.org/jira/browse/SPARK-14083. Note that they are aware of this but it has not been catered for.