Search code examples
dataframeapache-sparkpysparktimestampdate-difference

Calculate uptime per day from timestamps spanning several days


I have a list of incidents/downtime as follows that can span (or not) over multiple days for which I'd like to calculate the daily uptime (per device) using PySpark. Example dataset:

+--------+-----------------------+-----------------------+
|deviceId|startDate              |endDate                |
+--------+-----------------------+-----------------------+
|11615   |2022-06-11 13:48:11.6  |2022-06-13 18:05:44.2  |
|11618   |2022-07-11 22:17:24.401|2022-07-11 23:33:05.307|
|11618   |2022-07-28 02:29:14.6  |2022-08-08 23:33:05.103|
+--------+-----------------------+-----------------------+

I would like to compute daily availability rates/uptime, per device, for an arbitrary number of days, to end up with something like:

+───────────+─────────────+─────────+
| deviceId  | day         | uptime  |
+───────────+─────────────+─────────+
...
| 11615     | 2022-06-10  | 100%    |
| 11615     | 2022-06-11  | 88.3%   |
| 11615     | 2022-06-12  | 0%      |
| 11615     | 2022-06-13  | 76%     |
| 11618     | 2022-06-10  | 100%    |
...
+───────────+─────────────+─────────+
(Numbers are approximations here)

I'm not sure if this is doable without user-defined functions - any recommendation?


Solution

  • You should create days column (using sequence) and go from there calculating the uptime.

    Input:

    from pyspark.sql import functions as F
    df = spark.createDataFrame(
        [(11615, '2022-06-11 13:48:11.6', '2022-06-13 18:05:44.2'),
         (11618, '2022-07-11 22:17:24.401', '2022-07-11 23:33:05.307'),
         (11618, '2022-07-28 02:29:14.6', '2022-08-08 23:33:05.103')],
        ['deviceId', 'startDate', 'endDate'])
    min_date = F.lit("2022-06-10")
    max_date = F.lit("2022-06-13")
    

    Script:

    df = df.withColumn('day', F.explode(F.sequence(F.to_date(min_date), F.to_date(max_date))))
    secs_upper_bound = F.least(F.date_add('day', 1), F.to_timestamp('endDate')).cast('long')
    secs_lower_bound = F.greatest('day', F.to_timestamp('startDate')).cast('long')
    secs_downtime = F.greatest(F.lit(0), secs_upper_bound - secs_lower_bound)
    percent_uptime = F.round(100 - secs_downtime / 864, 1)
    df = df.select("deviceId", "day", percent_uptime.alias("uptime"))
    
    df.show()
    # +--------+----------+------+
    # |deviceId|       day|uptime|
    # +--------+----------+------+
    # |   11615|2022-06-10| 100.0|
    # |   11615|2022-06-11|  57.5|
    # |   11615|2022-06-12|   0.0|
    # |   11615|2022-06-13|  24.6|
    # |   11618|2022-06-10| 100.0|
    # |   11618|2022-06-11| 100.0|
    # |   11618|2022-06-12| 100.0|
    # |   11618|2022-06-13| 100.0|
    # |   11618|2022-06-10| 100.0|
    # |   11618|2022-06-11| 100.0|
    # |   11618|2022-06-12| 100.0|
    # |   11618|2022-06-13| 100.0|
    # +--------+----------+------+