Search code examples
apache-sparkpysparkavrospark-avro

Conditional loading of partitions from file-system


I am aware that there have been questions regarding wildcards in pySparks .load()-function like here or here. Anyhow, none of the questions/answers I found dealt with my variation of it.

Context

In pySpark I want to load files directly from HDFS because I have to use databricks avro-library for Spark 2.3.x. I'm doing so like this:

partition_stamp = "202104"

df = spark.read.format("com.databricks.spark.avro") \
        .load(f"/path/partition={partition_stamp}*") \
        .select("...")

As you can see the partitions are deriving from timestamps in the format yyyyMMdd.

Question

Currently I only get all partitions used for April 2021 (partition_stamp = "202104"). However, I need all partitions starting from April 2021.

Written in pseudo-code, I'd need a solution something alike this:

.load(f"/path/partition >= {partition_stamp}*")

Since there actually exist several hundred partitions it is no use to do it in any fashion that requires hard-coding.


So my question is: Is there a function for conditional file-loading?


Solution

  • As I learned there exist only the following options to dynamically process paths inside the .load()-function:

    *:  Wildcard for any character or sequence of characters until the end of the line or a new sub-directory ('/') -> (/path/20200*)
    [1-3]: Regex-like inclusion of a defined character-range -> (/path/20200[1-3]/...)
    {1,2,3}: Set-like inclusion of a defined set of characters -> (/path/20200{1,2,3}/...)
    

    Thus, to answer my question: There is no built-in function for conditional file-loading.




    Anyhow, I want to provide you my solution:

    import pandas as pd # Utilize pandas date-functions
    
    partition_stamp = ",".join((set(
                            str(_range.year) + "{:02}".format(_range.month) 
                            for _range in pd.date_range(start=start_date, end=end_date, freq='D')
                     )))
    
    df = spark.read.format("com.databricks.spark.avro") \
            .load(f"/path/partition={{{partition_stamp}}}*") \
            .select("...")
    

    This way the restriction for a timestamp of format yyyyMM is generated dynamically for a given start- and end-date and the string-based .load() is still usable.