using PySpark and Databricks. Starting from Minimum DateUpdated in dataset, check when Comp_BB_Status = 1 , from this DateUpdated check how much time duration did it took to change Comp_BB_Status = 0 I want to find time windows where Comp_BB_Status was 1, and calculate time duration in seconds and minutes for that particular winodow.
any kind of guidance will be appreciated. I am new to PySpark.
EDIT
Code to recreate dataframe:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
# Initialize SparkSession
spark_a = SparkSession.builder \
.appName("Create DataFrame") \
.getOrCreate()
# Define schema for the DataFrame
schema = StructType([
StructField("CpoSku", StringType(), True),
StructField("DateUpdated", TimestampType(), True),
StructField("CPO_BB_Status", IntegerType(), True)
])
# Define data
data = [
("AAGN7013005", "2024-01-24T05:02:06.898+00:00", 0),
("AAGN7013005", "2024-01-24T05:07:05.090+00:00", 1),
("AAGN7013005", "2024-01-24T06:42:56.825+00:00", 1),
("AAGN7013005", "2024-01-24T06:48:01.647+00:00", 1),
("AAGN7013005", "2024-01-24T07:48:18.456+00:00", 1),
("AAGN7013005", "2024-01-24T09:30:22.534+00:00", 1),
("AAGN7013005", "2024-01-24T09:36:04.075+00:00", 1),
("AAGN7013005", "2024-01-24T10:39:04.796+00:00", 1),
("AAGN7013005", "2024-01-24T10:44:01.193+00:00", 1),
("AAGN7013005", "2024-01-24T17:00:06.217+00:00", 1),
("AAGN7013005", "2024-01-24T18:07:16.612+00:00", 1),
("AAGN7013005", "2024-01-24T18:13:04.639+00:00", 0),
("AAGN7013005", "2024-01-24T21:33:03.796+00:00", 0),
("AAGN7013005", "2024-01-24T21:38:28.834+00:00", 1),
("AAGN7013005", "2024-01-24T22:35:43.995+00:00", 1),
("AAGN7013005", "2024-01-24T22:40:45.930+00:00", 0),
("AAGN7022205", "2024-01-24T04:09:30.167+00:00", 0),
("AAGN7022205", "2024-01-24T04:14:56.294+00:00", 0),
("AAGN7022205", "2024-01-24T04:53:01.281+00:00", 0),
("AAGN7022205", "2024-01-24T05:03:27.103+00:00", 0),
("AAGN7022205", "2024-01-24T05:08:05.096+00:00" ,1),
("AAGN7022205", "2024-01-24T05:53:22.652+00:00", 1),
("AAGN7022205", "2024-01-24T06:04:59.031+00:00", 1),
("AAGN7022205", "2024-01-24T06:43:04.285+00:00", 1),
("AAGN7022205", "2024-01-24T06:43:34.285+00:01", 0)
]
# Create DataFrame
df_test = spark.createDataFrame(data, schema=schema)
# Show DataFrame schema and preview data
df_test.printSchema()
df_test.show()
# Stop SparkSession
spark_a.stop()
Here's one of the possible solution. Steps are below.
We know that window starts when previous status = 0 and current status = 1.
Similarly window ends when current status = 1 and next status = 0.
To check these conditions we can create two extra columns i.e. one lagged status column and one lead status column.
By applying the conditions on status and lag and lead column, we can mark the start and end of the window in a separate column.
Next step involves create the window span markers. i.e. group_id to identify the window span themselves.
Once the window spans are identified by unique group ids, all we need to do is groupby over common columns and find min date as start_date
and max date as end_date
Implementation Below :
from pyspark.sql import Window
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import pyspark.sql.types as T
from pyspark.sql.functions import to_timestamp, col
spark = SparkSession.builder.appName("extract start end dates").getOrCreate()
schema = T.StructType([
T.StructField("CpoSku", T.StringType(), nullable=False),
T.StructField("DateUpdated", T.StringType(), nullable=False),
T.StructField("status", T.IntegerType(), nullable=False),
])
data = [
("AAGN7013005", "2024-01-24T05:02:06.898+00:00", 0),
("AAGN7013005", "2024-01-24T05:07:05.090+00:00", 1),
("AAGN7013005", "2024-01-24T06:42:56.825+00:00", 1),
("AAGN7013005", "2024-01-24T06:48:01.647+00:00", 1),
("AAGN7013005", "2024-01-24T07:48:18.456+00:00", 1),
("AAGN7013005", "2024-01-24T09:30:22.534+00:00", 1),
("AAGN7013005", "2024-01-24T09:36:04.075+00:00", 1),
("AAGN7013005", "2024-01-24T10:39:04.796+00:00", 1),
("AAGN7013005", "2024-01-24T10:44:01.193+00:00", 1),
("AAGN7013005", "2024-01-24T17:00:06.217+00:00", 1),
("AAGN7013005", "2024-01-24T18:07:16.612+00:00", 1),
("AAGN7013005", "2024-01-24T18:13:04.639+00:00", 0),
("AAGN7013005", "2024-01-24T21:33:03.796+00:00", 0),
("AAGN7013005", "2024-01-24T21:38:28.834+00:00", 1),
("AAGN7013005", "2024-01-24T22:35:43.995+00:00", 1),
("AAGN7013005", "2024-01-24T22:40:45.930+00:00", 0),
("AAGN7013005", "2024-01-25T01:03:50.742+00:00", 0),
("AAGN7013005", "2024-01-25T02:18:09.229+00:00", 0),
("AAGN7022205", "2024-01-24T04:09:30.167+00:00", 0),
("AAGN7022205", "2024-01-24T04:14:56.294+00:00", 0),
("AAGN7022205", "2024-01-24T04:53:01.281+00:00", 0),
("AAGN7022205", "2024-01-24T05:03:27.103+00:00", 0),
("AAGN7022205", "2024-01-24T05:08:05.096+00:00", 1),
("AAGN7022205", "2024-01-24T05:53:22.652+00:00", 1),
("AAGN7022205", "2024-01-24T06:04:59.031+00:00", 1),
("AAGN7022205", "2024-01-24T06:43:04.285+00:00", 1),
("AAGN7022205", "2024-01-24T06:43:34.285+00:00", 0),
]
df = spark.createDataFrame(data, schema=schema)
sample_df = df.withColumn("DateUpdated", to_timestamp(col("DateUpdated"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
sample_df.show(n=200, truncate=False)
windowSpec = Window.partitionBy("CpoSku").orderBy("DateUpdated")
sample_df = sample_df.withColumn("lagged_status", F.lag(col("status"), 1).over(windowSpec))
sample_df = sample_df.withColumn("lead_status", F.lead(col("status"), 1).over(windowSpec))
sample_df = sample_df.withColumn("start_end_column", F.when(
(
(F.col("status") == F.lit(1)) & (F.col("lagged_status") == F.lit(0))
), "start")
.when(
((F.col("status") == F.lit(1)) & (F.col("lead_status") == F.lit(0)))
, "end")
.otherwise("nan"))
sample_df.show(n=200, truncate=False)
start_indicator = F.when(F.col("start_end_column") == F.lit("start"), 1).otherwise(0)
group_id = F.sum(start_indicator).over(windowSpec)
sample_df = sample_df.withColumn("group_id", group_id)
sample_df.show(n=200, truncate=False)
sample_df = sample_df.filter(col("status") == F.lit(1)).groupBy("CpoSku", "status", "group_id").agg(min("DateUpdated").alias("start_end"), max("DateUpdated").alias("end_date"))
sample_df.show(n=200, truncate=False)
Output :
+-----------+------+--------+-----------------------+-----------------------+
|CpoSku |status|group_id|start_end |end_date |
+-----------+------+--------+-----------------------+-----------------------+
|AAGN7013005|1 |1 |2024-01-24 10:37:05.09 |2024-01-24 23:37:16.612|
|AAGN7013005|1 |2 |2024-01-25 03:08:28.834|2024-01-25 04:05:43.995|
|AAGN7022205|1 |1 |2024-01-24 10:38:05.096|2024-01-24 12:13:04.285|
+-----------+------+--------+-----------------------+-----------------------+
Full Output Below :
+-----------+-----------------------+------+
|CpoSku |DateUpdated |status|
+-----------+-----------------------+------+
|AAGN7013005|2024-01-24 10:32:06.898|0 |
|AAGN7013005|2024-01-24 10:37:05.09 |1 |
|AAGN7013005|2024-01-24 12:12:56.825|1 |
|AAGN7013005|2024-01-24 12:18:01.647|1 |
|AAGN7013005|2024-01-24 13:18:18.456|1 |
|AAGN7013005|2024-01-24 15:00:22.534|1 |
|AAGN7013005|2024-01-24 15:06:04.075|1 |
|AAGN7013005|2024-01-24 16:09:04.796|1 |
|AAGN7013005|2024-01-24 16:14:01.193|1 |
|AAGN7013005|2024-01-24 22:30:06.217|1 |
|AAGN7013005|2024-01-24 23:37:16.612|1 |
|AAGN7013005|2024-01-24 23:43:04.639|0 |
|AAGN7013005|2024-01-25 03:03:03.796|0 |
|AAGN7013005|2024-01-25 03:08:28.834|1 |
|AAGN7013005|2024-01-25 04:05:43.995|1 |
|AAGN7013005|2024-01-25 04:10:45.93 |0 |
|AAGN7013005|2024-01-25 06:33:50.742|0 |
|AAGN7013005|2024-01-25 07:48:09.229|0 |
|AAGN7022205|2024-01-24 09:39:30.167|0 |
|AAGN7022205|2024-01-24 09:44:56.294|0 |
|AAGN7022205|2024-01-24 10:23:01.281|0 |
|AAGN7022205|2024-01-24 10:33:27.103|0 |
|AAGN7022205|2024-01-24 10:38:05.096|1 |
|AAGN7022205|2024-01-24 11:23:22.652|1 |
|AAGN7022205|2024-01-24 11:34:59.031|1 |
|AAGN7022205|2024-01-24 12:13:04.285|1 |
|AAGN7022205|2024-01-24 12:13:34.285|0 |
+-----------+-----------------------+------+
+-----------+-----------------------+------+-------------+-----------+----------------+
|CpoSku |DateUpdated |status|lagged_status|lead_status|start_end_column|
+-----------+-----------------------+------+-------------+-----------+----------------+
|AAGN7013005|2024-01-24 10:32:06.898|0 |NULL |1 |nan |
|AAGN7013005|2024-01-24 10:37:05.09 |1 |0 |1 |start |
|AAGN7013005|2024-01-24 12:12:56.825|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 12:18:01.647|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 13:18:18.456|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 15:00:22.534|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 15:06:04.075|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 16:09:04.796|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 16:14:01.193|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 22:30:06.217|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 23:37:16.612|1 |1 |0 |end |
|AAGN7013005|2024-01-24 23:43:04.639|0 |1 |0 |nan |
|AAGN7013005|2024-01-25 03:03:03.796|0 |0 |1 |nan |
|AAGN7013005|2024-01-25 03:08:28.834|1 |0 |1 |start |
|AAGN7013005|2024-01-25 04:05:43.995|1 |1 |0 |end |
|AAGN7013005|2024-01-25 04:10:45.93 |0 |1 |0 |nan |
|AAGN7013005|2024-01-25 06:33:50.742|0 |0 |0 |nan |
|AAGN7013005|2024-01-25 07:48:09.229|0 |0 |NULL |nan |
|AAGN7022205|2024-01-24 09:39:30.167|0 |NULL |0 |nan |
|AAGN7022205|2024-01-24 09:44:56.294|0 |0 |0 |nan |
|AAGN7022205|2024-01-24 10:23:01.281|0 |0 |0 |nan |
|AAGN7022205|2024-01-24 10:33:27.103|0 |0 |1 |nan |
|AAGN7022205|2024-01-24 10:38:05.096|1 |0 |1 |start |
|AAGN7022205|2024-01-24 11:23:22.652|1 |1 |1 |nan |
|AAGN7022205|2024-01-24 11:34:59.031|1 |1 |1 |nan |
|AAGN7022205|2024-01-24 12:13:04.285|1 |1 |0 |end |
|AAGN7022205|2024-01-24 12:13:34.285|0 |1 |NULL |nan |
+-----------+-----------------------+------+-------------+-----------+----------------+
+-----------+-----------------------+------+-------------+-----------+----------------+--------+
|CpoSku |DateUpdated |status|lagged_status|lead_status|start_end_column|group_id|
+-----------+-----------------------+------+-------------+-----------+----------------+--------+
|AAGN7013005|2024-01-24 10:32:06.898|0 |NULL |1 |nan |0 |
|AAGN7013005|2024-01-24 10:37:05.09 |1 |0 |1 |start |1 |
|AAGN7013005|2024-01-24 12:12:56.825|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 12:18:01.647|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 13:18:18.456|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 15:00:22.534|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 15:06:04.075|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 16:09:04.796|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 16:14:01.193|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 22:30:06.217|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 23:37:16.612|1 |1 |0 |end |1 |
|AAGN7013005|2024-01-24 23:43:04.639|0 |1 |0 |nan |1 |
|AAGN7013005|2024-01-25 03:03:03.796|0 |0 |1 |nan |1 |
|AAGN7013005|2024-01-25 03:08:28.834|1 |0 |1 |start |2 |
|AAGN7013005|2024-01-25 04:05:43.995|1 |1 |0 |end |2 |
|AAGN7013005|2024-01-25 04:10:45.93 |0 |1 |0 |nan |2 |
|AAGN7013005|2024-01-25 06:33:50.742|0 |0 |0 |nan |2 |
|AAGN7013005|2024-01-25 07:48:09.229|0 |0 |NULL |nan |2 |
|AAGN7022205|2024-01-24 09:39:30.167|0 |NULL |0 |nan |0 |
|AAGN7022205|2024-01-24 09:44:56.294|0 |0 |0 |nan |0 |
|AAGN7022205|2024-01-24 10:23:01.281|0 |0 |0 |nan |0 |
|AAGN7022205|2024-01-24 10:33:27.103|0 |0 |1 |nan |0 |
|AAGN7022205|2024-01-24 10:38:05.096|1 |0 |1 |start |1 |
|AAGN7022205|2024-01-24 11:23:22.652|1 |1 |1 |nan |1 |
|AAGN7022205|2024-01-24 11:34:59.031|1 |1 |1 |nan |1 |
|AAGN7022205|2024-01-24 12:13:04.285|1 |1 |0 |end |1 |
|AAGN7022205|2024-01-24 12:13:34.285|0 |1 |NULL |nan |1 |
+-----------+-----------------------+------+-------------+-----------+----------------+--------+
+-----------+------+--------+-----------------------+-----------------------+
|CpoSku |status|group_id|start_end |end_date |
+-----------+------+--------+-----------------------+-----------------------+
|AAGN7013005|1 |1 |2024-01-24 10:37:05.09 |2024-01-24 23:37:16.612|
|AAGN7013005|1 |2 |2024-01-25 03:08:28.834|2024-01-25 04:05:43.995|
|AAGN7022205|1 |1 |2024-01-24 10:38:05.096|2024-01-24 12:13:04.285|
+-----------+------+--------+-----------------------+-----------------------+