Search code examples
datedatetimepysparkexplode

Fetch start and end between two dates inclusive in pyspark


I have been trying to fetch months range from 2 given dates, but it's not working as expected.

e.g.

  • start_date (dd-mm-yyyy) = 12-01-2022
  • end_date (dd-mm-yyyy) = 03-06-2022

Expected output:

Valid_From Valid_To
2022-01-12 2022-01-31
2022-02-01 2022-02-28
2022-03-01 2022-03-31
2022-04-01 2022-04-30
2022-05-01 2022-05-31
2022-06-01 2022-06-03

My code:

var_forecast_start_date = datetime.datetime(2022, 1, 12)
var_forecast_end_date = datetime.datetime(2022, 6, 2)

df_datetime = pandas_to_spark(
    df_datetime(start=var_forecast_start_date, end=var_forecast_end_date)
)


df_datetime = df_datetime.withColumn(
    "DateID", date_format(df_datetime.Date, "yyyyMMdd").cast(IntegerType())
).withColumn("FiscalDate", date_format(df_datetime.Date, "yyyy-MM-dd"))

df_datetime = df_datetime.selectExpr(
    "add_months(date_add(last_day(Date),1),-1) AS Valid_From",
    "last_day(Date) AS Valid_To",
).distinct()

Solution

  • try maybe the following:

    import findspark
    from pyspark.sql import SparkSession, Window
    from pyspark.sql import functions as F
    
    findspark.init()
    spark = SparkSession.builder.appName("local").getOrCreate()
    columns = ["start_date", "end_date"]
    data = [("12-01-2022", "03-06-2022")]
    
    df = spark.createDataFrame(data).toDF(*columns)
    df = (
        df.withColumn(
            "start_date", F.to_date(F.col("start_date"), "dd-MM-yyyy").cast("DATE")
        )
        .withColumn(
            "end_date", F.to_date(F.col("end_date"), "dd-MM-yyyy").cast("DATE")
        )
        .withColumn(
            "months_between",
            F.round(
                F.months_between(F.col("end_date"), F.col("start_date"), True)
            ).cast("Integer"),
        )
        .withColumn(
            "months_between_seq", F.sequence(F.lit(1), F.col("months_between"))
        )
        .withColumn("months_between_seq", F.explode(F.col("months_between_seq")))
        .withColumn(
            "end_of_month",
            F.expr(
                """
                    LAST_DAY(ADD_MONTHS(start_date, months_between_seq - 1))
                """
            ),
        )
        .withColumn(
            "begin_of_month",
            F.expr(
                """
                    LAST_DAY(ADD_MONTHS(start_date, months_between_seq - 1)) + 1
                """
            ),
        )
    )
    
    start_window_agg = Window.partitionBy().orderBy("Valid_From")
    start_union_sdf = (
        df.select(
            F.col("start_date").alias("Valid_From")
        )
        .unionByName(
            df.select(
                F.col("begin_of_month").alias("Valid_From")
            )
        )
        .drop_duplicates()
        .withColumn(
            "row_number",
            F.row_number().over(start_window_agg)
        )
    )
    end_window_agg = Window.partitionBy().orderBy("Valid_To")
    end_union_sdf = (
        df.select(
            F.col("end_date").alias("Valid_To")
        )
        .unionByName(
            df.select(
                F.col("end_of_month").alias("Valid_To")
            )
        )
        .drop_duplicates()
        .withColumn(
            "row_number",
            F.row_number().over(end_window_agg)
        )
    )
    join_sdf = (
        end_union_sdf
        .join(
            start_union_sdf,
            how="inner",
            on=["row_number"]
        )
        .drop("row_number")
        .withColumn("Valid_To", F.col("Valid_To").cast("DATE"))
        .withColumn("Valid_From", F.col("Valid_From").cast("DATE"))
        .select("Valid_From", "Valid_To")
        .orderBy("Valid_From")
    )
    join_sdf.show()
    

    It returns:

    +----------+----------+
    |Valid_From|  Valid_To|
    +----------+----------+
    |2022-01-12|2022-01-31|
    |2022-02-01|2022-02-28|
    |2022-03-01|2022-03-31|
    |2022-04-01|2022-04-30|
    |2022-05-01|2022-05-31|
    |2022-06-01|2022-06-03|
    +----------+----------+