Search code examples
apache-sparkdatepysparktimestampduration

Calculate timestamp duration using UNIX timestamps


I want to calculate the duration within groups of the same subs_no, and year.

Note, This is slightly different from this because the column new_ts is UNIX timestamp, not string.

Here's my code:

from pyspark.sql import functions as F, Window as W

df_subs_loc_movmnt_ts = df_subs_loc_movmnt.withColumn("new_ts",from_unixtime(unix_timestamp(col("ts"),"HH:mm:ss"),"HH:mm:ss"))
w = W.partitionBy('subs_no', 'year').orderBy('new_ts')
df_subs_loc_movmnt_duration = df_subs_loc_movmnt_ts.withColumn('duration', F.regexp_extract('new_ts' - F.min('new_ts').over(w),"HH:mm:ss", 0))

but the df_subs_loc_movmnt_duration is always null

+--------+---------------+--------+---------------+-------------+----+-----+---+--------+--------+
| date_id|             ts| subs_no|            cgi|       msisdn|year|month|day|  new_ts|duration|
+--------+---------------+--------+---------------+-------------+----+-----+---+--------+--------+
|20200801|17:00:10.000000|10100559|510-11-610104-9|3729882521647|2022|    6|  1|17:00:10|    null|
|20200801|17:09:39.000000|10100559|510-11-610104-9|3729882521647|2022|    6|  1|17:09:39|    null|
|20200801|06:44:55.000000|10100559|510-11-610034-6|3729882521647|2022|    6|  1|06:44:55|    null|
|20200801|17:45:35.000000|10100559|510-11-610079-5|3729882521647|2022|    6|  1|17:45:35|    null|
|20200801|17:48:05.000000|10100559|510-11-610660-4|3729882521647|2022|    6|  1|17:48:05|    null|
|20200801|18:07:25.000000|10100559|510-11-610127-6|3729882521647|2022|    6|  1|18:07:25|    null|
+--------+---------------+--------+---------------+-------------+----+-----+---+--------+--------+

Solution

  • Don't use from_unixtime in the first line, use it below, because this function will convert your timestamp into string, but later you do a calculation which requires a timestamp/long format (not a string). Also, you don't need regex_extract when you use from_unixtime with "HH:mm:ss" format.

    from pyspark.sql import functions as F, Window as W
    df_subs_loc_movmnt = spark.createDataFrame(
        [('17:00:10.000000', '10100559', 2022, '17:00:10'),
         ('17:09:39.000000', '10100559', 2022, '17:09:39'),
         ('06:44:55.000000', '10100559', 2022, '06:44:55'),
         ('17:45:35.000000', '10100559', 2022, '17:45:35'),
         ('17:48:05.000000', '10100559', 2022, '17:48:05'),
         ('18:07:25.000000', '10100559', 2022, '18:07:25')],
        ['ts', 'subs_no', 'year', 'new_ts'])
    

    Script:

    df_subs_loc_movmnt_ts = df_subs_loc_movmnt.withColumn("new_ts", F.unix_timestamp(F.col("ts"), "HH:mm:ss"))
    w = W.partitionBy('subs_no', 'year').orderBy('new_ts')
    df_subs_loc_movmnt_duration = df_subs_loc_movmnt_ts.withColumn('duration', F.from_unixtime(F.col('new_ts') - F.min('new_ts').over(w), "HH:mm:ss"))
    
    df_subs_loc_movmnt_duration.show()
    # +---------------+--------+----+------+--------+
    # |             ts| subs_no|year|new_ts|duration|
    # +---------------+--------+----+------+--------+
    # |06:44:55.000000|10100559|2022| 24295|00:00:00|
    # |17:00:10.000000|10100559|2022| 61210|10:15:15|
    # |17:09:39.000000|10100559|2022| 61779|10:24:44|
    # |17:45:35.000000|10100559|2022| 63935|11:00:40|
    # |17:48:05.000000|10100559|2022| 64085|11:03:10|
    # |18:07:25.000000|10100559|2022| 65245|11:22:30|
    # +---------------+--------+----+------+--------+