Search code examples
scalaapache-sparkdatepyspark

filter the data on start and end days from a delta table


I have a delta table partitioned by year,month,day. partitioned columns are set as string.

I need to read data for the last seven days from the Delta table. For instance, if the job runs on a Tuesday, it should retrieve data from the previous Monday to Sunday, totaling seven days.

from datetime import datetime,timedelta
from pyspark.sql.functions import *

trigger_date = "2024-05-07" # yyyy-mm-dd
trigger_date_obj = datetime.strptime(trigger_date, '%Y-%m-%d')
start_date_obj = trigger_date_obj - timedelta(days=8)
end_date_obj = trigger_date_obj - timedelta(days=2)

# Extract week, start_day and end_day
start_day = start_date_obj.day
start_month = start_date_obj.month
start_year = start_date_obj.year

end_day = end_date_obj.day
end_month = end_date_obj.month
end_year = end_date_obj.year


condition = ((col("year").between(start_year, end_year)) & 
             (col("month").between(start_month, end_month)) & 
             (col("day").between(start_day, end_day)) 
            )

data = spark.read.format("delta").table("delta_table_name") 
data = data.filter(condition)

This approach works correctly when the days fall within the same month / year (e.g., if the trigger date is "2024-05-09"). However, it fails to produce the desired result (returns no data) if the trigger date is "2024-05-07", as the start (2024-4-29) and end dates(2024-5-5) span across different months.

How can we have a generic filter condition if possible which works across all scenarios?


Solution

  • I think the simplest solution would be to create a new DateType column from month, year, and day columns, and then filter based on that column.

    data.withColumn(
        "date",
        concat_ws("-",col("year"),col("month"),col("day")).cast("date")
    ).filter(
        col("date").between(start_date_obj, end_date_obj)
    )