Search code examples
rapache-sparkapache-spark-sqldata.tablesparklyr

How to count unique values over time in sparklyr (example given)?


I am trying to calculate unique devices seen in last 10 minutes from a timestamp(rounded off to minutes). I can do that in data.table , but no idea how to replicate same in sparklyr in R. 540 refers to # seconds added to current timestamp.

Example is provided below to explain my problem.

Given Data

df<-data.frame(device_subscriber_id=c("x","a","z","x","a","z","x","y","a","z"),
                  start_timestamp=c("2020-12-11 14:21:00","2020-12-11 14:22:00","2020-12-11 14:23:00",
                                    "2020-12-11 14:26:00","2020-12-11 14:24:00","2020-12-11 14:25:00",
                                    "2020-12-11 14:26:00","2020-12-11 14:28:00","2020-12-11 14:31:00","2020-12-11 14:38:00"))

df$start_timestamp<-as.POSIXct(df$start_timestamp,format="%Y-%m-%d %H:%M:%S")
dt<-setDT(df)

Expected Data

expected_dt<-dt[dt[ , .(start_timestamp3=start_timestamp, start_timestamp2 = start_timestamp - 540, device_subscriber_id)], 
             on = .(start_timestamp >= start_timestamp2, start_timestamp<=start_timestamp3), 
             allow.cartesian = TRUE][ , .(unique_devices_seen = uniqueN(device_subscriber_id)),by = .(start_timestamp = start_timestamp + 540)]

expected_dt

   start_timestamp unique_devices_seen
   2020-12-11 14:21:00                   1
   2020-12-11 14:22:00                   2
   2020-12-11 14:23:00                   3
   2020-12-11 14:26:00                   3
   2020-12-11 14:24:00                   3
   2020-12-11 14:25:00                   3
   2020-12-11 14:28:00                   4
   2020-12-11 14:31:00                   4
   2020-12-11 14:38:00                   2

Solution

  • I suggest to use SQL window function OVER between the current row and preceding 540 seconds. The count(distinct device_subscriber_id) throughs Error: org.apache.spark.sql.AnalysisException: Distinct window functions are not supported. A work around is to collect the set of unique ids and return the size of the array. The timestamps were converted to epoch in order to use the range values in seconds.

    library(sparklyr)
    library(tidyverse)
    sc <- spark_connect(master="local[4]", version = "3.0.1")
    
    sdf <- copy_to(sc, df, name = "df", overwrite = TRUE)
    
    sdf_sql(sc, "
    SELECT 
      start_timestamp,
      size(collect_set(device_subscriber_id) 
           OVER (ORDER BY start_ts_epoch ASC 
                 RANGE BETWEEN 540 PRECEDING AND CURRENT ROW)) as unique_devices_seen
    FROM (SELECT *, unix_timestamp(start_timestamp) as start_ts_epoch FROM `df`)")
    
    

    Result:

    # Source: spark<?> [?? x 2]
       start_timestamp     unique_devices_seen
       <dttm>                            <int>
     1 2020-12-11 13:21:00                   1
     2 2020-12-11 13:22:00                   2
     3 2020-12-11 13:23:00                   3
     4 2020-12-11 13:24:00                   3
     5 2020-12-11 13:25:00                   3
     6 2020-12-11 13:26:00                   3
     7 2020-12-11 13:26:00                   3
     8 2020-12-11 13:28:00                   4
     9 2020-12-11 13:31:00                   4
    10 2020-12-11 13:38:00                   2
    

    Reference Spark SQL Window Functions API


    Bonus: if the missing timestamps are desired you need to join the device's data with a table containing all possible timestamps. The missing timestamp will have device id as nulls and will not contribute to the count.

    df_ts <- data.frame(start_timestamp=seq(min(df$start_timestamp), max(df$start_timestamp), by = "min"))
    sdf_ts <- copy_to(sc, df_ts, name = "df_ts", overwrite = TRUE)
    
    sdf_sql(sc, "
    SELECT DISTINCT
      start_timestamp
      , size(collect_set(device_subscriber_id) 
             OVER (ORDER BY start_ts_epoch ASC 
                   RANGE BETWEEN 540 PRECEDING AND CURRENT ROW)) as unique_devices_seen
      , concat_ws(',', collect_set(device_subscriber_id)
                       OVER (ORDER BY start_ts_epoch ASC 
                       RANGE BETWEEN 540 PRECEDING AND CURRENT ROW)) as unique_devices_seen_csv
    FROM (SELECT 
            device_subscriber_id
            , df_ts.start_timestamp
            , unix_timestamp(df_ts.start_timestamp) as start_ts_epoch
          FROM df
          FULL JOIN df_ts ON (df.start_timestamp = df_ts.start_timestamp))") %>% print(n=30)
    

    Note that I added unique_devices_seen_csv to show what is going on behind the scene. It concatenates the device ids of the sliding window.

    Result:

    # Source: spark<?> [?? x 3]
       start_timestamp     unique_devices_seen unique_devices_seen_csv
       <dttm>                            <int> <chr>                  
     1 2020-12-11 13:21:00                   1 x                      
     2 2020-12-11 13:22:00                   2 x,a                    
     3 2020-12-11 13:23:00                   3 z,x,a                  
     4 2020-12-11 13:24:00                   3 z,x,a                  
     5 2020-12-11 13:25:00                   3 z,x,a                  
     6 2020-12-11 13:26:00                   3 z,x,a                  
     7 2020-12-11 13:27:00                   3 z,x,a                  
     8 2020-12-11 13:28:00                   4 z,y,x,a                
     9 2020-12-11 13:29:00                   4 z,y,x,a                
    10 2020-12-11 13:30:00                   4 z,y,x,a                
    11 2020-12-11 13:31:00                   4 z,y,x,a                
    12 2020-12-11 13:32:00                   4 z,y,x,a                
    13 2020-12-11 13:33:00                   4 z,y,x,a                
    14 2020-12-11 13:34:00                   4 z,y,x,a                
    15 2020-12-11 13:35:00                   3 y,x,a                  
    16 2020-12-11 13:36:00                   2 y,a                    
    17 2020-12-11 13:37:00                   2 y,a                    
    18 2020-12-11 13:38:00                   2 z,a