Search code examples
apache-sparkpysparkapache-spark-sqltimestampduration

Calculate duration inside groups based on timestamp


I have a dataframe looking like this:

| id | device| x | y | z | timestamp |
  1   device_1 22  8   23  2020-10-30T16:00:00.000+0000
  1   device_1 21  88  65  2020-10-30T16:01:00.000+0000
  1   device_1 33  34  64  2020-10-30T16:02:00.000+0000
  2   device_2 12  6   97  2019-11-30T13:00:00.000+0000
  2   device_2 44  77  13  2019-11-30T13:00:00.000+0000
  1   device_1 22  11  30  2022-10-30T08:00:00.000+0000
  1   device_1 22  11  30  2022-10-30T08:01:00.000+0000

The data represents events for an "id" on a certain point in time. I would like to see development of values over a period of time to plot a time series for instance.

I'm thinking of adding a column 'duration' which is 0 for the first entry and then the difference in the next entry related to the same id on the same day (there might be multiple different event streams for the same id on separate days).

I would ideally want a dataframe looking something like this:

| id | device | x | y | z | timestamp |                  duration |
  1   device_1 22  8   23  2020-10-30T16:00:00.000+0000   00:00.00.000
  1   device_1 21  88  65  2020-10-30T16:01:00.000+0000   00:01:00.000
  1   device_1 33  34  64  2020-10-30T16:02:00.000+0000   00:02:00.000
  2   device_2 12  6   97  2019-11-30T13:00:00.000+0000   00:00:00.000
  2   device_2 44  77  13  2019-11-30T13:00:30.000+0000   00:00:30.000
  1   device_1 22  11  30  2022-10-30T08:00:00.000+0000   00:00:00.000
  1   device_1 22  11  30  2022-10-30T08:01:00.000+0000   00:01:00.000

I have no idea where to begin in order to achieve this so either a good explanation or a code example would be very helpful!

Any other suggestions on how to be able to plot development over time (in general not related to a specific date or time of the day) based on this dataframe are also very welcome.

Note: It has to be in PySpark (not pandas) since the dataset is extremely large.


Solution

  • You will need to use window functions (specific functions working inside partitions created using over clause). The below code does the same thing as in the other answer, but I wanted to show a more streamlined version, fully in PySpark, as opposed to PySpark + SQL with subqueries.

    Initially, the column "difference" will be of type interval, so then it's up to you to try to transform it to whatever data type you need. I have just extracted the interval using regexp_extract which stores it as string.

    Input (I assume your "timestamp" column is of type timestamp):

    from pyspark.sql import functions as F, Window as W
    df = spark.createDataFrame(
        [(1, 'device_1', 22, 8, 23, '2020-10-30T16:00:00.000+0000'),
         (1, 'device_1', 21, 88, 65, '2020-10-30T16:01:00.000+0000'),
         (1, 'device_1', 33, 34, 64, '2020-10-30T16:02:00.000+0000'),
         (2, 'device_2', 12, 6, 97, '2019-11-30T13:00:00.000+0000') ,
         (2, 'device_2', 44, 77, 13, '2019-11-30T13:00:30.000+0000'),
         (1, 'device_1', 22, 11, 30, '2022-10-30T08:00:00.000+0000'),
         (1, 'device_1', 22, 11, 30, '2022-10-30T08:01:00.000+0000')],
        ["id", "device", "x", "y", "z", "timestamp"]
    ).withColumn("timestamp", F.to_timestamp("timestamp"))
    

    Script:

    w = W.partitionBy('id', F.to_date('timestamp')).orderBy('timestamp')
    df = df.withColumn('duration', F.col('timestamp') - F.min('timestamp').over(w))
    df = df.withColumn('duration', F.regexp_extract('duration', r'\d\d:\d\d:\d\d', 0))
    
    df.show(truncate=0)
    # +---+--------+---+---+---+-------------------+--------+
    # |id |device  |x  |y  |z  |timestamp          |duration|
    # +---+--------+---+---+---+-------------------+--------+
    # |1  |device_1|22 |8  |23 |2020-10-30 16:00:00|00:00:00|
    # |1  |device_1|21 |88 |65 |2020-10-30 16:01:00|00:01:00|
    # |1  |device_1|33 |34 |64 |2020-10-30 16:02:00|00:02:00|
    # |1  |device_1|22 |11 |30 |2022-10-30 08:00:00|00:00:00|
    # |1  |device_1|22 |11 |30 |2022-10-30 08:01:00|00:01:00|
    # |2  |device_2|12 |6  |97 |2019-11-30 13:00:00|00:00:00|
    # |2  |device_2|44 |77 |13 |2019-11-30 13:00:30|00:00:30|
    # +---+--------+---+---+---+-------------------+--------+