Search code examples
pyspark

Split Data in 30 Minute Intervals: Pyspark


I need to split my data in 15 minute intervals basis calendar time

For example, data is like below

ID      |   rh_start_time       |   rh_end_time         |   total_duration
5421833835  |   31-12-2023 13:26:53 |   31-12-2023 13:27:03 |   10
5421833961  |   31-12-2023 13:23:50 |   31-12-2023 13:39:10 |   360

I want to split it in 15 minute intervals like below

ID          |   rh_start_time       |   rh_end_time         |   total_duration | Interval Start
5421833835  |   31-12-2023 13:26:53 |   31-12-2023 13:27:03 |   10 | 31-12-2023 13:00:00
5421833961  |   31-12-2023 13:23:50 |   31-12-2023 13:39:10 |   360 | 31-12-2023 13:00:00
5421833961  |   31-12-2023 13:23:50 |   31-12-2023 13:39:10 |   360 | 31-12-2023 13:30:00

I tried using explode + seq, but it creates data in 15 minute chunks (e.g. 2023-12-31 13:26:53,2023-12-31 13:41:53), but not in actual calendar

intervals.withColumn(
    "rh_interval_start_ts",
    explode(expr("sequence(rh_start_time, rh_end_time, interval 30 minutes)")),
)

Solution

  • One solution is to prepare intervals and do a join:

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
    from pyspark.sql.functions import col, expr, to_timestamp
    from datetime import datetime, timedelta
    spark = SparkSession.builder.appName("example").getOrCreate()
    # Create the DataFrame example:
    schema = StructType([
        StructField("ID", StringType(), True),
        StructField("rh_start_time", TimestampType(), True),
        StructField("rh_end_time", TimestampType(), True),
        StructField("total_duration", IntegerType(), True)
    ])
    def to_ts(date):
        return datetime.strptime(date, "%d-%m-%Y %H:%M:%S")
    data = [
        ("5421833835", to_ts("31-12-2023 13:26:53"), to_ts("31-12-2023 13:27:03"), 10),
        ("5421833961", to_ts("31-12-2023 13:23:50"), to_ts("31-12-2023 13:39:10"), 360)
    ]
    
    data = spark.createDataFrame(data, schema=schema)
    data.show()
    # Create all dates (if necessary, you can search the min and max in data):
    start_date = datetime(2023, 12, 31)
    end_date = datetime(2024, 1, 1)
    interval = timedelta(minutes=15)
    timestamps = [start_date + i * interval for i in range(int((end_date - start_date).total_seconds() // (15 * 60) + 1))]
    raw_ts = [(timestamp,) for timestamp in timestamps]
    column = "interval_start"
    intervals = spark.createDataFrame(raw_ts, [column])
    intervals = intervals.withColumn(column, col(column).cast(TimestampType()))
    intervals.show()
    # Do a join:
    result = data.join(intervals, on=(
        (intervals["interval_start"] >= data["rh_start_time"] - expr("INTERVAL 15 MINUTES"))
        & (intervals["interval_start"] <= data["rh_end_time"])
    ))
    result.show()
    

    Alternatively, you can do the explode but on a floor of the start time:

    from pyspark.sql.functions import col, floor, explode
    data.withColumn(
        'start_floor',
        (F.floor(F.col('rh_start_time').cast('integer') / (60 * 15)) * (60 * 15)).cast('timestamp')
    ).withColumn(
        "interval_start",
        F.explode(F.expr("sequence(start_floor, rh_end_time, interval 15 minutes)")),
    ).show()