Search code examples
apache-sparkpysparkapache-spark-sqldate-range

Generating monthly timestamps between two dates in pyspark dataframe


I have some DataFrame with "date" column and I'm trying to generate a new DataFrame with all monthly timestamps between the min and max date from the "date" column.

One of the solution is below:

month_step = 31*60*60*24

min_date, max_date = df.select(min_("date").cast("long"), max_("date").cast("long")).first()

df_ts = spark.range(
    (min_date / month_step) * month_step, 
    ((max_date / month_step) + 1) * month_step,
    month_step
).select(col("id").cast("timestamp").alias("yearmonth"))

df_formatted_ts = df_ts.withColumn(
    "yearmonth",
    f.concat(f.year("yearmonth"), f.lit('-'), format_string("%02d", f.month("yearmonth")))
).select('yearmonth')

df_formatted_ts.orderBy(asc('yearmonth')).show(150, False)

The problem is that I took as a month_step 31 days and its not really correct because some of the months have 30 days and even 28 days. Is possible to somehow make it more precise?

Just as a note: Later I only need year and month values so I will ignore day and time. But anyway because I'm generating timestamps between quite a big date range (between 2001 and 2018) the timestamps shifting.

That's why sometimes some months will be skipped. For example, this snapshot is missing 2010-02:

|2010-01  |
|2010-03  |
|2010-04  |
|2010-05  |
|2010-06  |
|2010-07  |

I checked and there are just 3 months which were skipped from 2001 through 2018.


Solution

  • Suppose you had the following DataFrame:

    data = [("2000-01-01","2002-12-01")]
    df = spark.createDataFrame(data, ["minDate", "maxDate"])
    df.show()
    #+----------+----------+
    #|   minDate|   maxDate|
    #+----------+----------+
    #|2000-01-01|2002-12-01|
    #+----------+----------+
    

    You can add a column date with all of the months in between minDate and maxDate, by following the same approach as my answer to this question.

    Just replace pyspark.sql.functions.datediff with pyspark.sql.functions.months_between, and use add_months instead of date_add:

    import pyspark.sql.functions as f
    
    df.withColumn("monthsDiff", f.months_between("maxDate", "minDate"))\
        .withColumn("repeat", f.expr("split(repeat(',', monthsDiff), ',')"))\
        .select("*", f.posexplode("repeat").alias("date", "val"))\
        .withColumn("date", f.expr("add_months(minDate, date)"))\
        .select('date')\
        .show(n=50)
    #+----------+
    #|      date|
    #+----------+
    #|2000-01-01|
    #|2000-02-01|
    #|2000-03-01|
    #|2000-04-01|
    # ...skipping some rows...
    #|2002-10-01|
    #|2002-11-01|
    #|2002-12-01|
    #+----------+