I need to split my data in 15 minute intervals basis calendar time
For example, data is like below
ID | rh_start_time | rh_end_time | total_duration
5421833835 | 31-12-2023 13:26:53 | 31-12-2023 13:27:03 | 10
5421833961 | 31-12-2023 13:23:50 | 31-12-2023 13:39:10 | 360
I want to split it in 15 minute intervals like below
ID | rh_start_time | rh_end_time | total_duration | Interval Start
5421833835 | 31-12-2023 13:26:53 | 31-12-2023 13:27:03 | 10 | 31-12-2023 13:00:00
5421833961 | 31-12-2023 13:23:50 | 31-12-2023 13:39:10 | 360 | 31-12-2023 13:00:00
5421833961 | 31-12-2023 13:23:50 | 31-12-2023 13:39:10 | 360 | 31-12-2023 13:30:00
I tried using explode + seq, but it creates data in 15 minute chunks (e.g. 2023-12-31 13:26:53,2023-12-31 13:41:53), but not in actual calendar
intervals.withColumn(
"rh_interval_start_ts",
explode(expr("sequence(rh_start_time, rh_end_time, interval 30 minutes)")),
)
One solution is to prepare intervals and do a join:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, expr, to_timestamp
from datetime import datetime, timedelta
spark = SparkSession.builder.appName("example").getOrCreate()
# Create the DataFrame example:
schema = StructType([
StructField("ID", StringType(), True),
StructField("rh_start_time", TimestampType(), True),
StructField("rh_end_time", TimestampType(), True),
StructField("total_duration", IntegerType(), True)
])
def to_ts(date):
return datetime.strptime(date, "%d-%m-%Y %H:%M:%S")
data = [
("5421833835", to_ts("31-12-2023 13:26:53"), to_ts("31-12-2023 13:27:03"), 10),
("5421833961", to_ts("31-12-2023 13:23:50"), to_ts("31-12-2023 13:39:10"), 360)
]
data = spark.createDataFrame(data, schema=schema)
data.show()
# Create all dates (if necessary, you can search the min and max in data):
start_date = datetime(2023, 12, 31)
end_date = datetime(2024, 1, 1)
interval = timedelta(minutes=15)
timestamps = [start_date + i * interval for i in range(int((end_date - start_date).total_seconds() // (15 * 60) + 1))]
raw_ts = [(timestamp,) for timestamp in timestamps]
column = "interval_start"
intervals = spark.createDataFrame(raw_ts, [column])
intervals = intervals.withColumn(column, col(column).cast(TimestampType()))
intervals.show()
# Do a join:
result = data.join(intervals, on=(
(intervals["interval_start"] >= data["rh_start_time"] - expr("INTERVAL 15 MINUTES"))
& (intervals["interval_start"] <= data["rh_end_time"])
))
result.show()
Alternatively, you can do the explode but on a floor of the start time:
from pyspark.sql.functions import col, floor, explode
data.withColumn(
'start_floor',
(F.floor(F.col('rh_start_time').cast('integer') / (60 * 15)) * (60 * 15)).cast('timestamp')
).withColumn(
"interval_start",
F.explode(F.expr("sequence(start_floor, rh_end_time, interval 15 minutes)")),
).show()