Search code examples
sqlpysparkapache-spark-sqlgaps-and-islands

How to construct distinct date ranges from a set of ranges in sql


I have a table with a boolean field I care about, along with the date ranges those fields are relevant. These date ranges can be overlapping, can be entirely interior to other date ranges, and can have gaps.

I only care about rows where the boolean field is true but I need to construct non-overlapping date ranges so I can sensibly join back to my data without creating duplicate records.

i.e. my data may look like:

user | is_relevant | from_date  | to_date    | description
-----|-------------|------------|------------|-------------
1    | true        | 2024-01-01 | 2024-02-01 | month of january
1    | true        | 2024-01-02 | 2024-01-03 | subset of january
1    | true        | 2024-01-15 | 2024-02-15 | mid-jan to mid-feb
1    | true        | 2024-03-01 | 2024-04-01 | distinct date range

I need to get to a table with two rows, one from 2024-01-01 to 2024-02-15 and another from 2024-03-01 to 2024-04-01.

Any help at all would be appreciated. If it makes a difference I'm running this in spark SQL, but a PySpark solution is fine.


Solution

  • This is a classic gaps-and-islands problem.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import (
        col,
        collect_list,
        struct,
        row_number,
        last,
        min,
        max,
        when,
        lit,
        sum,
    )
    from pyspark.sql.types import DateType
    from pyspark.sql.window import Window
    
    # Initialize Spark session
    spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()
    
    # Sample data
    data = [
        (1, True, "2024-01-01", "2024-02-01", "month of january"),
        (1, True, "2024-01-02", "2024-01-03", "subset of january"),
        (1, True, "2024-01-15", "2024-02-15", "mid-jan to mid-feb"),
        (1, True, "2024-03-01", "2024-04-01", "distinct date range")
    ]
    
    # Create DataFrame
    df = spark.createDataFrame(data, ["user", "is_relevant", "from_date", "to_date", "description"])
    
    # Convert string dates to DateType
    df = df.withColumn("start", col("from_date").cast(DateType())) \
           .withColumn("end", col("to_date").cast(DateType()))
    
    # Filter only rows where is_relevant is True
    df = df.where(col("is_relevant"))
    
    df = (
        df.withColumn("row_num", row_number().over(Window.orderBy("start", "end")))
        .withColumn(
            "previous_end",
            max(col("end")).over(
                Window.partitionBy("user")
                .orderBy("start", "end")
                .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
            ),
        )
        .withColumn("island_start_indicator", when(col("previous_end") >= col("start"), lit(0)).otherwise(lit(1)))
        .withColumn("island_id", sum("island_start_indicator").over(Window.orderBy("start", "end")))
        .withColumn("island_min_start", min("start").over(Window.partitionBy('island_id')))
        .withColumn("island_max_end", max("end").over(Window.partitionBy('island_id')))
        .select('user', 'is_relevant', col('island_min_start').alias('start_date'), col('island_max_end').alias('end_date'))
        .dropDuplicates()
    )
    
    df.show()
    
    +----+-----------+----------+----------+
    |user|is_relevant|start_date|  end_date|
    +----+-----------+----------+----------+
    |   1|       true|2024-01-01|2024-02-15|
    |   1|       true|2024-03-01|2024-04-01|
    +----+-----------+----------+----------+