Search code examples

Convert Repeating Values to Intervals in Databricks

I have the following scenario:

Multiple devices send their installed operating system OS version every day, regardless of a change in the version or not. For example, in below Table1, Device_A sends the same value OS_1 for two consecutive days, until the version is changed on the third day. Similar situation for Device_B:


Device TimeStamp OSVersion
Device_A 2023-01-01 00:00:00 OS_1
Device_A 2023-01-02 00:00:00 OS_1
Device_B 2023-05-01 00:00:00 OS_1.1
Device_B 2023-05-02 00:00:00 OS_1.2
Device_A 2023-01-03 00:00:00 OS_2
Device_A 2023-01-04 00:00:00 OS_2

This table is filled with millions of devices, so I would like to convert this table, into another structure, that is for each device we have the intervals (from, to) within which a specific OS version is installed. Table2 below illustrates the desired schema:


Device OSVersion StartTime EndTime
Device_A OS_1 2023-01-01 00:00:00 2023-01-02 23:59:59
Device_A OS_2 2023-01-03 00:00:00 2999-12-31 23:59:59
Device_B OS_1.1 2023-05-01 00:00:00 2023-05-01 23:59:59
Device_B OS_1.2 2023-05-02 00:00:00 2999-12-31 23:59:59

Note: even a device is normally upgraded, but it could also be downgraded to the older OS version.

The solution should be on Databricks delta tables, using Pyspark or Databricks SQL


  • This is the way to do it with pyspark:

    >>> from datetime import datetime
    >>> from pyspark.sql import SparkSession
    >>> from pyspark.sql import functions as F
    >>> from pyspark.sql import Window
    >>> spark = SparkSession.builder.getOrCreate()
    >>> dt = lambda s: datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
    >>> df = spark.createDataFrame(
    >>>   [("Device_A", dt("2023-01-01 00:00:00"), "OS_1"),
    >>>   ("Device_A", dt("2023-01-02 00:00:00"), "OS_1"),
    >>>   ("Device_B", dt("2023-05-01 00:00:00"), "OS_1.1"),
    >>>   ("Device_B", dt("2023-05-02 00:00:00"), "OS_1.2"),
    >>>   ("Device_A", dt("2023-01-03 00:00:00"), "OS_2"),
    >>>   ("Device_A", dt("2023-01-04 00:00:00"), "OS_2"),
    >>>   ("Device_A", dt("2023-01-05 00:00:00"), "OS_1"),
    >>>   ("Device_A", dt("2023-01-06 00:00:00"), "OS_1")],
    >>>   ["device", "timestamp", "os"]
    >>> )
    |  device|          timestamp|    os|
    |Device_A|2023-01-01 00:00:00|  OS_1|
    |Device_A|2023-01-02 00:00:00|  OS_1|
    |Device_B|2023-05-01 00:00:00|OS_1.1|
    |Device_B|2023-05-02 00:00:00|OS_1.2|
    |Device_A|2023-01-03 00:00:00|  OS_2|
    |Device_A|2023-01-04 00:00:00|  OS_2|
    |Device_A|2023-01-05 00:00:00|  OS_1|
    |Device_A|2023-01-06 00:00:00|  OS_1|

    Using windows is the way to do it:

    >>> window = Window().partitionBy("device").orderBy("timestamp")

    We have to split the rows in groups based on os and ordered by timestamp.

    >>> df = df.withColumn("_os_lag", F.lag("os").over(window))
    |  device|          timestamp|    os|_os_lag|
    |Device_A|2023-01-01 00:00:00|  OS_1|   null|
    |Device_A|2023-01-02 00:00:00|  OS_1|   OS_1|
    |Device_A|2023-01-03 00:00:00|  OS_2|   OS_1|
    |Device_A|2023-01-04 00:00:00|  OS_2|   OS_2|
    |Device_A|2023-01-05 00:00:00|  OS_1|   OS_2|
    |Device_A|2023-01-06 00:00:00|  OS_1|   OS_1|
    |Device_B|2023-05-01 00:00:00|OS_1.1|   null|
    |Device_B|2023-05-02 00:00:00|OS_1.2| OS_1.1|

    Comparing os with _os_lag we can detect os changes:

    >>> df = df.withColumn("_os_change", F.when(~df.os.eqNullSafe(df._os_lag), F.lit(1)).otherwise(F.lit(0)))
    |  device|          timestamp|    os|_os_lag|_os_change|
    |Device_A|2023-01-01 00:00:00|  OS_1|   null|         1|
    |Device_A|2023-01-02 00:00:00|  OS_1|   OS_1|         0|
    |Device_A|2023-01-03 00:00:00|  OS_2|   OS_1|         1|
    |Device_A|2023-01-04 00:00:00|  OS_2|   OS_2|         0|
    |Device_A|2023-01-05 00:00:00|  OS_1|   OS_2|         1|
    |Device_A|2023-01-06 00:00:00|  OS_1|   OS_1|         0|
    |Device_B|2023-05-01 00:00:00|OS_1.1|   null|         1|
    |Device_B|2023-05-02 00:00:00|OS_1.2| OS_1.1|         1|

    Now we do the cumulative sum of _os_change over the defined window to split each device in groups (on each group all have the same os, and the groups are ordered by timestamp):

    >>> df = df.withColumn("_group", F.sum("_os_change").over(window))
    |  device|          timestamp|    os|_os_lag|_os_change|_group|
    |Device_A|2023-01-01 00:00:00|  OS_1|   null|         1|     1|
    |Device_A|2023-01-02 00:00:00|  OS_1|   OS_1|         0|     1|
    |Device_A|2023-01-03 00:00:00|  OS_2|   OS_1|         1|     2|
    |Device_A|2023-01-04 00:00:00|  OS_2|   OS_2|         0|     2|
    |Device_A|2023-01-05 00:00:00|  OS_1|   OS_2|         1|     3|
    |Device_A|2023-01-06 00:00:00|  OS_1|   OS_1|         0|     3|
    |Device_B|2023-05-01 00:00:00|OS_1.1|   null|         1|     1|
    |Device_B|2023-05-02 00:00:00|OS_1.2| OS_1.1|         1|     2|

    The last thing we have to do is get the maximum and minimum timestamp for each device group:

    >>> df.groupBy("device", "os", "_group").agg(
    >>>    F.min("timestamp").name("start"),
    >>>    F.max("timestamp").name("end")
    >>> ).show()
    |  device|    os|_group|              start|                end|
    |Device_A|  OS_1|     1|2023-01-01 00:00:00|2023-01-02 00:00:00|
    |Device_A|  OS_2|     2|2023-01-03 00:00:00|2023-01-04 00:00:00|
    |Device_A|  OS_1|     3|2023-01-05 00:00:00|2023-01-06 00:00:00|
    |Device_B|OS_1.1|     1|2023-05-01 00:00:00|2023-05-01 00:00:00|
    |Device_B|OS_1.2|     2|2023-05-02 00:00:00|2023-05-02 00:00:00|