Search code examples
apache-sparkoptimizationfilterparquetpartitioning

Does partitioning help when filter-reading key columns using a function?


I have this dataframe:

+------+
|my_col|
+------+
|202101|
|202209|
+------+

When writing as parquet file, I partition it based on column 'my_col', so I should get two partitions (two parquet files).

Then I will be reading the saved dataset, applying a filter.

  1. Am I correct that Spark will not load data into the memory from the file containing my_col=202101 when applying the filter .filter("my_col >= 202201")?
  2. Will the data from the parquet file containing my_col=202101 be loaded into the memory when applying the filter .filter("substring(my_col, 1, 4) >= 2022")?

In the latter case, I do not filter directly on key column values, but instead, a function is applied on the column. I wonder, if in this case partitioning helps to save on read time.


Solution

  • So I tried the below example to find out whether the function used in the filter is making any difference. Example:

    data = [
        ("A ", "202201"),
        ("B ", "202209"),
        ("C ", "202210"),
        ("D ", "202301"),
    ]
    
    columns = ["name", "dob"]
    df = spark.createDataFrame(data, columns)
    
    df.write.partitionBy("dob").mode("overwrite").parquet("people.parquet")
    
    people = spark.read.parquet("people.parquet")
    people_filtered_without_function = people.filter("dob >= 202201")
    people_filtered_with_function = people.filter("substring(dob, 1, 4) >= 2022")
    
    people_filtered_without_function.explain()
    people_filtered_with_function.explain()
    

    Here are the results of the physical plans:

    1. Without function

    == Physical Plan == *(1) ColumnarToRow +- FileScan parquet [name#7,dob#8] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/opt/spark/work-dir/people.parquet], PartitionFilters: [isnotnull(dob#8), (dob#8 >= 202201)], PushedFilters: [], ReadSchema: structname:string

    1. With function

    == Physical Plan == *(1) ColumnarToRow +- FileScan parquet [name#7,dob#8] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/opt/spark/work-dir/people.parquet], PartitionFilters: [isnotnull(dob#8), (cast(substring(cast(dob#8 as string), 1, 4) as int) >= 2022)], PushedFilters: [], ReadSchema: structname:string

    Note above the main difference being:

    Partition Filter (Without) :

    (dob#8 >= 202201)

    Partition Filter (With) :

    (cast(substring(cast(dob#8 as string), 1, 4) as int) >= 2022)

    As you can see above the difference between the two methods. I would say this is what is happening:

    • Yes, Indeed the data will be filtered by spark and only the relevant files are read.
    • The plan also reflects the function usage in the filter. So I would assume only the overhead of the type casting and function application to be the main difference in performance here and not the actual filtering logic