I have a delta table partitioned by year
,month
,day
. partitioned columns are set as string.
I need to read data for the last seven days from the Delta table. For instance, if the job runs on a Tuesday, it should retrieve data from the previous Monday to Sunday, totaling seven days.
from datetime import datetime,timedelta
from pyspark.sql.functions import *
trigger_date = "2024-05-07" # yyyy-mm-dd
trigger_date_obj = datetime.strptime(trigger_date, '%Y-%m-%d')
start_date_obj = trigger_date_obj - timedelta(days=8)
end_date_obj = trigger_date_obj - timedelta(days=2)
# Extract week, start_day and end_day
start_day = start_date_obj.day
start_month = start_date_obj.month
start_year = start_date_obj.year
end_day = end_date_obj.day
end_month = end_date_obj.month
end_year = end_date_obj.year
condition = ((col("year").between(start_year, end_year)) &
(col("month").between(start_month, end_month)) &
(col("day").between(start_day, end_day))
)
data = spark.read.format("delta").table("delta_table_name")
data = data.filter(condition)
This approach works correctly when the days fall within the same month / year (e.g., if the trigger date is "2024-05-09"). However, it fails to produce the desired result (returns no data) if the trigger date is "2024-05-07", as the start (2024-4-29) and end dates(2024-5-5) span across different months.
How can we have a generic filter condition if possible which works across all scenarios?
I think the simplest solution would be to create a new DateType
column from month, year, and day columns, and then filter based on that column.
data.withColumn(
"date",
concat_ws("-",col("year"),col("month"),col("day")).cast("date")
).filter(
col("date").between(start_date_obj, end_date_obj)
)