I have the following scenario:
Multiple devices send their installed operating system OS
version every day, regardless of a change in the version or not. For example, in below Table1
, Device_A
sends the same value OS_1
for two consecutive days, until the version is changed on the third day. Similar situation for Device_B
:
Table1:
Device | TimeStamp | OSVersion |
---|---|---|
Device_A | 2023-01-01 00:00:00 | OS_1 |
Device_A | 2023-01-02 00:00:00 | OS_1 |
Device_B | 2023-05-01 00:00:00 | OS_1.1 |
Device_B | 2023-05-02 00:00:00 | OS_1.2 |
Device_A | 2023-01-03 00:00:00 | OS_2 |
Device_A | 2023-01-04 00:00:00 | OS_2 |
This table is filled with millions of devices, so I would like to convert this table, into another structure, that is for each device we have the intervals (from, to) within which a specific OS version is installed. Table2
below illustrates the desired schema:
Table2:
Device | OSVersion | StartTime | EndTime |
---|---|---|---|
Device_A | OS_1 | 2023-01-01 00:00:00 | 2023-01-02 23:59:59 |
Device_A | OS_2 | 2023-01-03 00:00:00 | 2999-12-31 23:59:59 |
Device_B | OS_1.1 | 2023-05-01 00:00:00 | 2023-05-01 23:59:59 |
Device_B | OS_1.2 | 2023-05-02 00:00:00 | 2999-12-31 23:59:59 |
Note: even a device is normally upgraded, but it could also be downgraded to the older OS version.
The solution should be on Databricks
delta tables, using Pyspark
or Databricks SQL
This is the way to do it with pyspark
:
>>> from datetime import datetime
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import functions as F
>>> from pyspark.sql import Window
>>>
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> dt = lambda s: datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
>>> df = spark.createDataFrame(
>>> [("Device_A", dt("2023-01-01 00:00:00"), "OS_1"),
>>> ("Device_A", dt("2023-01-02 00:00:00"), "OS_1"),
>>> ("Device_B", dt("2023-05-01 00:00:00"), "OS_1.1"),
>>> ("Device_B", dt("2023-05-02 00:00:00"), "OS_1.2"),
>>> ("Device_A", dt("2023-01-03 00:00:00"), "OS_2"),
>>> ("Device_A", dt("2023-01-04 00:00:00"), "OS_2"),
>>> ("Device_A", dt("2023-01-05 00:00:00"), "OS_1"),
>>> ("Device_A", dt("2023-01-06 00:00:00"), "OS_1")],
>>> ["device", "timestamp", "os"]
>>> )
>>> df.show()
+--------+-------------------+------+
| device| timestamp| os|
+--------+-------------------+------+
|Device_A|2023-01-01 00:00:00| OS_1|
|Device_A|2023-01-02 00:00:00| OS_1|
|Device_B|2023-05-01 00:00:00|OS_1.1|
|Device_B|2023-05-02 00:00:00|OS_1.2|
|Device_A|2023-01-03 00:00:00| OS_2|
|Device_A|2023-01-04 00:00:00| OS_2|
|Device_A|2023-01-05 00:00:00| OS_1|
|Device_A|2023-01-06 00:00:00| OS_1|
+--------+-------------------+------+
Using windows is the way to do it:
>>> window = Window().partitionBy("device").orderBy("timestamp")
We have to split the rows in groups based on os
and ordered by timestamp
.
>>> df = df.withColumn("_os_lag", F.lag("os").over(window))
>>> df.show()
+--------+-------------------+------+-------+
| device| timestamp| os|_os_lag|
+--------+-------------------+------+-------+
|Device_A|2023-01-01 00:00:00| OS_1| null|
|Device_A|2023-01-02 00:00:00| OS_1| OS_1|
|Device_A|2023-01-03 00:00:00| OS_2| OS_1|
|Device_A|2023-01-04 00:00:00| OS_2| OS_2|
|Device_A|2023-01-05 00:00:00| OS_1| OS_2|
|Device_A|2023-01-06 00:00:00| OS_1| OS_1|
|Device_B|2023-05-01 00:00:00|OS_1.1| null|
|Device_B|2023-05-02 00:00:00|OS_1.2| OS_1.1|
+--------+-------------------+------+-------+
Comparing os
with _os_lag
we can detect os
changes:
>>> df = df.withColumn("_os_change", F.when(~df.os.eqNullSafe(df._os_lag), F.lit(1)).otherwise(F.lit(0)))
>>> df.show()
+--------+-------------------+------+-------+----------+
| device| timestamp| os|_os_lag|_os_change|
+--------+-------------------+------+-------+----------+
|Device_A|2023-01-01 00:00:00| OS_1| null| 1|
|Device_A|2023-01-02 00:00:00| OS_1| OS_1| 0|
|Device_A|2023-01-03 00:00:00| OS_2| OS_1| 1|
|Device_A|2023-01-04 00:00:00| OS_2| OS_2| 0|
|Device_A|2023-01-05 00:00:00| OS_1| OS_2| 1|
|Device_A|2023-01-06 00:00:00| OS_1| OS_1| 0|
|Device_B|2023-05-01 00:00:00|OS_1.1| null| 1|
|Device_B|2023-05-02 00:00:00|OS_1.2| OS_1.1| 1|
+--------+-------------------+------+-------+----------+
Now we do the cumulative sum of _os_change
over the defined window to split each device in groups (on each group all have the same os
, and the groups are ordered by timestamp
):
>>> df = df.withColumn("_group", F.sum("_os_change").over(window))
>>> df.show()
+--------+-------------------+------+-------+----------+------+
| device| timestamp| os|_os_lag|_os_change|_group|
+--------+-------------------+------+-------+----------+------+
|Device_A|2023-01-01 00:00:00| OS_1| null| 1| 1|
|Device_A|2023-01-02 00:00:00| OS_1| OS_1| 0| 1|
|Device_A|2023-01-03 00:00:00| OS_2| OS_1| 1| 2|
|Device_A|2023-01-04 00:00:00| OS_2| OS_2| 0| 2|
|Device_A|2023-01-05 00:00:00| OS_1| OS_2| 1| 3|
|Device_A|2023-01-06 00:00:00| OS_1| OS_1| 0| 3|
|Device_B|2023-05-01 00:00:00|OS_1.1| null| 1| 1|
|Device_B|2023-05-02 00:00:00|OS_1.2| OS_1.1| 1| 2|
+--------+-------------------+------+-------+----------+------+
The last thing we have to do is get the maximum and minimum timestamp
for each device group:
>>> df.groupBy("device", "os", "_group").agg(
>>> F.min("timestamp").name("start"),
>>> F.max("timestamp").name("end")
>>> ).show()
+--------+------+------+-------------------+-------------------+
| device| os|_group| start| end|
+--------+------+------+-------------------+-------------------+
|Device_A| OS_1| 1|2023-01-01 00:00:00|2023-01-02 00:00:00|
|Device_A| OS_2| 2|2023-01-03 00:00:00|2023-01-04 00:00:00|
|Device_A| OS_1| 3|2023-01-05 00:00:00|2023-01-06 00:00:00|
|Device_B|OS_1.1| 1|2023-05-01 00:00:00|2023-05-01 00:00:00|
|Device_B|OS_1.2| 2|2023-05-02 00:00:00|2023-05-02 00:00:00|
+--------+------+------+-------------------+-------------------+