Search code examples
scalaapache-sparkfilterapache-spark-sqlpartition

How to filter parquet partitions based on date range?


I have partitioned parquet data:

dir/batch_date=2023-02-13/batch_hour=09

I have to read last 14 days data through spark program. Currently, I'm reading the data and applying date filter on the dataframe as batch_date minus 14 days. Is there any way to put a range of directories to limit the read to only last 14 days directories and not the entire data set.

Thanks


Solution

  • What you are already doing is optimal, because of the concept of PartitionFilters in apache spark, so when you apply filters on a partitioned column these filters are applied on the data at the source, before any data is sent over the network, to reduce the amount of data transferred.

    For example, I have some data partitioned by Year:

    /path/
       Year=2018/
           file.parquet
       Year=2019/
           file.parquet
       Year=2020/
           file.parquet
       Year=2021/
           file.parquet
       Year=2022/
           file.parquet
       Year=2023/
           file.parquet
    

    If I apply the following code:

    spark.read.parquet("/path/").filter(col("Year") >= "2020").explain()
    

    I will get the following Physical Plan:

    == Physical Plan ==
    *(1) ColumnarToRow
    +- FileScan parquet [Variable_name#0,Value#1,Units#2,Year#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/user/out..., PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)], PushedFilters: [], ReadSchema: struct<Variable_name:string,Value:string,Units:string>
    

    If you search for PartitionFilters you will find this:

    PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)]
    

    Which mean that the partition filters are applied, and only the desired partitions will be loaded, however if you don't see the PartitionFilters, that means that something went wrong and the whole data will be loaded

    If for some reason the PartitionFilters didn't work than you can always use hadoop to filter the folders that you want to load with spark

    val hdfs = new Path(path).getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
    val filesToRead = hdfs.listStatus(new Path(path)).toList.filter(_.getPath.getName.split("=")(1) >= min_date)
    

    Then read filesToRead using spark.