Search code examples
pythonapache-sparkpysparkapache-spark-sqlback-testing

Create historical data from a dataframe in pyspark


I have a dataframe as follows:

date some_quantity
... ...
2021-01-01 4
2021-01-02 1
2021-01-03 6
2021-01-04 2
2021-01-05 2
2021-01-06 8
2021-01-07 9
2021-01-08 1
... ...

I would like to create the historical data for each calendar day, and in a final step do some aggregations. The intermediate dataframe should look like this:

calendar_date date some_quantity
... ... ...
2021-01-03 2021-01-01 4
2021-01-03 2021-01-02 1
2021-01-04 ... ...
2021-01-04 2021-01-01 4
2021-01-04 2021-01-02 1
2021-01-04 2021-01-03 6
2021-01-05 ... ...
2021-01-05 2021-01-01 4
2021-01-05 2021-01-02 1
2021-01-05 2021-01-03 6
2021-01-05 2021-01-04 2
2021-01-06 ... ...
2021-01-06 2021-01-01 4
2021-01-06 2021-01-02 1
2021-01-06 2021-01-03 6
2021-01-06 2021-01-04 2
2021-01-06 2021-01-05 2
2021-01-06 ... ...

With this dataframe any aggregation on the calendar date is easy (e.g indicate how many quantities were sold prior to that day, average 7days, average30days, etc.).

I tried to run a for loop of calendar dates:

for i, date in enumerate(pd.data_range("2021-01-01","2021-03-01"):

   df_output = []

   df_transformed = df.where(F.col("date") < date)
   df_transformed = df_transformed.withColumn("calendar_date", date)

   if i == 0:
      df_output = df_transformed
   else:
      df_output = df_output.union(df_transformed)

However, this is highly inefficient and it crashes as soon as I started including more columns.

Is it possible to create a dataframe with calendar dates and do a join that recreated the dataframe I expect?

Thanks for any help.


Solution

  • You can simply join a calendar dataframe with your main dataframe with join condition "less than":

    # Let's call your main dataframe as `df`
    
    # Extracting first and last date
    _, min_date, max_date = (df
        .groupBy(F.lit(1))
        .agg(
            F.min('date').alias('min_date'),
            F.max('date').alias('max_date'),
        )
        .first()
    )
    
    # Then create a temporary dataframe to hold all calendar dates
    dates = [{'calendar_date': str(d.date())} for d in pd.date_range(min_date, max_date)]
    calendar_df = spark.createDataFrame(dates)
    calendar_df.show(10, False)
    # +-------------+
    # |calendar_date|
    # +-------------+
    # |2021-01-01   |
    # |2021-01-02   |
    # |2021-01-03   |
    # |2021-01-04   |
    # |2021-01-05   |
    # |2021-01-06   |
    # |2021-01-07   |
    # |2021-01-08   |
    # +-------------+
    
    # Now you can join to build your expected dataframe, note the join condition
    (calendar_df
        .join(df, on=[calendar_df.calendar_date > df.date])
        .show()
    )
    # +-------------+----------+---+
    # |calendar_date|      date|qty|
    # +-------------+----------+---+
    # |   2021-01-02|2021-01-01|  4|
    # |   2021-01-03|2021-01-01|  4|
    # |   2021-01-03|2021-01-02|  1|
    # |   2021-01-04|2021-01-01|  4|
    # |   2021-01-04|2021-01-02|  1|
    # |   2021-01-04|2021-01-03|  6|
    # |   2021-01-05|2021-01-01|  4|
    # |   2021-01-05|2021-01-02|  1|
    # |   2021-01-05|2021-01-03|  6|
    # |   2021-01-05|2021-01-04|  2|
    # |   2021-01-06|2021-01-01|  4|
    # |   2021-01-06|2021-01-02|  1|
    # |   2021-01-06|2021-01-03|  6|
    # |   2021-01-06|2021-01-04|  2|
    # |   2021-01-06|2021-01-05|  2|
    # |   2021-01-07|2021-01-01|  4|
    # |   2021-01-07|2021-01-02|  1|
    # |   2021-01-07|2021-01-03|  6|
    # |   2021-01-07|2021-01-04|  2|
    # |   2021-01-07|2021-01-05|  2|
    # +-------------+----------+---+
    # only showing top 20 rows