I am trying to calculate unique devices seen in last 10 minutes from a timestamp(rounded off to minutes). I can do that in data.table , but no idea how to replicate same in sparklyr in R. 540 refers to # seconds added to current timestamp.
Example is provided below to explain my problem.
df<-data.frame(device_subscriber_id=c("x","a","z","x","a","z","x","y","a","z"),
start_timestamp=c("2020-12-11 14:21:00","2020-12-11 14:22:00","2020-12-11 14:23:00",
"2020-12-11 14:26:00","2020-12-11 14:24:00","2020-12-11 14:25:00",
"2020-12-11 14:26:00","2020-12-11 14:28:00","2020-12-11 14:31:00","2020-12-11 14:38:00"))
df$start_timestamp<-as.POSIXct(df$start_timestamp,format="%Y-%m-%d %H:%M:%S")
dt<-setDT(df)
expected_dt<-dt[dt[ , .(start_timestamp3=start_timestamp, start_timestamp2 = start_timestamp - 540, device_subscriber_id)],
on = .(start_timestamp >= start_timestamp2, start_timestamp<=start_timestamp3),
allow.cartesian = TRUE][ , .(unique_devices_seen = uniqueN(device_subscriber_id)),by = .(start_timestamp = start_timestamp + 540)]
expected_dt
start_timestamp unique_devices_seen
2020-12-11 14:21:00 1
2020-12-11 14:22:00 2
2020-12-11 14:23:00 3
2020-12-11 14:26:00 3
2020-12-11 14:24:00 3
2020-12-11 14:25:00 3
2020-12-11 14:28:00 4
2020-12-11 14:31:00 4
2020-12-11 14:38:00 2
I suggest to use SQL window function OVER
between the current row and preceding 540 seconds. The count(distinct device_subscriber_id)
throughs Error: org.apache.spark.sql.AnalysisException: Distinct window functions are not supported
. A work around is to collect the set of unique ids and return the size of the array. The timestamps were converted to epoch in order to use the range values in seconds.
library(sparklyr)
library(tidyverse)
sc <- spark_connect(master="local[4]", version = "3.0.1")
sdf <- copy_to(sc, df, name = "df", overwrite = TRUE)
sdf_sql(sc, "
SELECT
start_timestamp,
size(collect_set(device_subscriber_id)
OVER (ORDER BY start_ts_epoch ASC
RANGE BETWEEN 540 PRECEDING AND CURRENT ROW)) as unique_devices_seen
FROM (SELECT *, unix_timestamp(start_timestamp) as start_ts_epoch FROM `df`)")
Result:
# Source: spark<?> [?? x 2]
start_timestamp unique_devices_seen
<dttm> <int>
1 2020-12-11 13:21:00 1
2 2020-12-11 13:22:00 2
3 2020-12-11 13:23:00 3
4 2020-12-11 13:24:00 3
5 2020-12-11 13:25:00 3
6 2020-12-11 13:26:00 3
7 2020-12-11 13:26:00 3
8 2020-12-11 13:28:00 4
9 2020-12-11 13:31:00 4
10 2020-12-11 13:38:00 2
Reference Spark SQL Window Functions API
Bonus: if the missing timestamps are desired you need to join the device's data with a table containing all possible timestamps. The missing timestamp will have device id as nulls and will not contribute to the count.
df_ts <- data.frame(start_timestamp=seq(min(df$start_timestamp), max(df$start_timestamp), by = "min"))
sdf_ts <- copy_to(sc, df_ts, name = "df_ts", overwrite = TRUE)
sdf_sql(sc, "
SELECT DISTINCT
start_timestamp
, size(collect_set(device_subscriber_id)
OVER (ORDER BY start_ts_epoch ASC
RANGE BETWEEN 540 PRECEDING AND CURRENT ROW)) as unique_devices_seen
, concat_ws(',', collect_set(device_subscriber_id)
OVER (ORDER BY start_ts_epoch ASC
RANGE BETWEEN 540 PRECEDING AND CURRENT ROW)) as unique_devices_seen_csv
FROM (SELECT
device_subscriber_id
, df_ts.start_timestamp
, unix_timestamp(df_ts.start_timestamp) as start_ts_epoch
FROM df
FULL JOIN df_ts ON (df.start_timestamp = df_ts.start_timestamp))") %>% print(n=30)
Note that I added unique_devices_seen_csv
to show what is going on behind the scene. It concatenates the device ids of the sliding window.
Result:
# Source: spark<?> [?? x 3]
start_timestamp unique_devices_seen unique_devices_seen_csv
<dttm> <int> <chr>
1 2020-12-11 13:21:00 1 x
2 2020-12-11 13:22:00 2 x,a
3 2020-12-11 13:23:00 3 z,x,a
4 2020-12-11 13:24:00 3 z,x,a
5 2020-12-11 13:25:00 3 z,x,a
6 2020-12-11 13:26:00 3 z,x,a
7 2020-12-11 13:27:00 3 z,x,a
8 2020-12-11 13:28:00 4 z,y,x,a
9 2020-12-11 13:29:00 4 z,y,x,a
10 2020-12-11 13:30:00 4 z,y,x,a
11 2020-12-11 13:31:00 4 z,y,x,a
12 2020-12-11 13:32:00 4 z,y,x,a
13 2020-12-11 13:33:00 4 z,y,x,a
14 2020-12-11 13:34:00 4 z,y,x,a
15 2020-12-11 13:35:00 3 y,x,a
16 2020-12-11 13:36:00 2 y,a
17 2020-12-11 13:37:00 2 y,a
18 2020-12-11 13:38:00 2 z,a