I have a table with a boolean field I care about, along with the date ranges those fields are relevant. These date ranges can be overlapping, can be entirely interior to other date ranges, and can have gaps.
I only care about rows where the boolean field is true
but I need to construct non-overlapping date ranges so I can sensibly join back to my data without creating duplicate records.
i.e. my data may look like:
user | is_relevant | from_date | to_date | description
-----|-------------|------------|------------|-------------
1 | true | 2024-01-01 | 2024-02-01 | month of january
1 | true | 2024-01-02 | 2024-01-03 | subset of january
1 | true | 2024-01-15 | 2024-02-15 | mid-jan to mid-feb
1 | true | 2024-03-01 | 2024-04-01 | distinct date range
I need to get to a table with two rows, one from 2024-01-01
to 2024-02-15
and another from 2024-03-01
to 2024-04-01
.
Any help at all would be appreciated. If it makes a difference I'm running this in spark SQL, but a PySpark solution is fine.
This is a classic gaps-and-islands
problem.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col,
collect_list,
struct,
row_number,
last,
min,
max,
when,
lit,
sum,
)
from pyspark.sql.types import DateType
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()
# Sample data
data = [
(1, True, "2024-01-01", "2024-02-01", "month of january"),
(1, True, "2024-01-02", "2024-01-03", "subset of january"),
(1, True, "2024-01-15", "2024-02-15", "mid-jan to mid-feb"),
(1, True, "2024-03-01", "2024-04-01", "distinct date range")
]
# Create DataFrame
df = spark.createDataFrame(data, ["user", "is_relevant", "from_date", "to_date", "description"])
# Convert string dates to DateType
df = df.withColumn("start", col("from_date").cast(DateType())) \
.withColumn("end", col("to_date").cast(DateType()))
# Filter only rows where is_relevant is True
df = df.where(col("is_relevant"))
df = (
df.withColumn("row_num", row_number().over(Window.orderBy("start", "end")))
.withColumn(
"previous_end",
max(col("end")).over(
Window.partitionBy("user")
.orderBy("start", "end")
.rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
),
)
.withColumn("island_start_indicator", when(col("previous_end") >= col("start"), lit(0)).otherwise(lit(1)))
.withColumn("island_id", sum("island_start_indicator").over(Window.orderBy("start", "end")))
.withColumn("island_min_start", min("start").over(Window.partitionBy('island_id')))
.withColumn("island_max_end", max("end").over(Window.partitionBy('island_id')))
.select('user', 'is_relevant', col('island_min_start').alias('start_date'), col('island_max_end').alias('end_date'))
.dropDuplicates()
)
df.show()
+----+-----------+----------+----------+
|user|is_relevant|start_date| end_date|
+----+-----------+----------+----------+
| 1| true|2024-01-01|2024-02-15|
| 1| true|2024-03-01|2024-04-01|
+----+-----------+----------+----------+