Search code examples
apache-sparkapache-spark-datasetcatalyst-optimizerframeless

Spark reads all columns in filtering when using scala syntax


This code is good, it only reads the column i (notice the last line ReadSchema: struct<i:bigint>, which only reads i):

import org.apache.spark.sql.Dataset

// Define the case class
case class Foo(i: Long, j: String)

// Create a Dataset of Foo
val ds: Dataset[Foo] = spark.createDataset(Seq(
  Foo(1, "Q"),
  Foo(10, "W"),
  Foo(100, "E")
))

// Filter and cast the column
val result = ds.filter($"i" === 2).select($"i")

// Explain the query plan
result.explain()

// It prints:
//== Physical Plan ==
//*(1) Filter (isnotnull(i#225L) AND (i#225L = 2))
//+- *(1) ColumnarToRow
//   +- FileScan parquet [i#225L] Batched: true, DataFilters: [isnotnull(i#225L), (i#225L = 2)], //Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], //PushedFilters: [IsNotNull(i), EqualTo(i,2)], ReadSchema: struct<i:bigint>

However, if i use val result = ds.filter(_.i == 10).map(_.i), the physical plan will read all columns including j (notice the last line ReadSchema: struct<i:bigint,j:string>):

//= Physical Plan ==
//*(1) SerializeFromObject [input[0, bigint, false] AS value#336L]
//+- *(1) MapElements //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8811/2079//839768@1028cff, obj#335: bigint
//   +- *(1) Filter //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8810/7014//[email protected]
//      +- *(1) DeserializeToObject newInstance(class //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo), obj#334: //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo
//         +- *(1) ColumnarToRow
//            +- FileScan parquet [i#225L,j#226] Batched: true, DataFilters: [], Format: Parquet, //Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], PushedFilters: [], //ReadSchema: struct<i:bigint,j:string>

Why does spark handle differently when i use the scala syntax _.i inside filter?


Solution

  • _ forces collection, tuple processing like rdd due to scala map. You see ColumnarToRow & DeserilaizeToObject in physical plan.

    Catalyst has little insight into Scala code and this is an old aspect if you google. ds.map(_.x) is same as col("x") but Spark Catalyst still does nothing in this regard.

    See https://issues.apache.org/jira/browse/SPARK-14083. Note that they are aware of this but it has not been catered for.