Search code examples
sqlapache-sparksparkr

SparkR. SQL. Count records satisfying criteria within rolling time window using timestamps


I have a dataset with a structure similar to the df you get from this:

dates<- base::seq.POSIXt(from=as.POSIXlt(as.Date("2018-01-01"), 
format="%Y-%m-%d"), to=as.POSIXlt(as.Date("2018-01-03"), format="%Y-%m-%d"), by = "hour")
possible_statuses<- c('moving', 'stopped')
statuses4demo<- base::sample(possible_statuses, size=98, replace = TRUE, prob = c(.75, .25))
hours_back<- 5
hours_back_milliseconds<- hours_back*3600 * 1000

# Generate dataframe
df<- data.frame(date=rep(dates,2), user_id=c(rep("user_1", 49), rep("user_2", 49)), status=statuses4demo)
df$row_id<- seq(from=1,to=nrow(df), by=1)
df$eventTimestamp<- as.numeric(format(df$date, "%s"))*1000
df$hours_back_timestamp<- df$eventTimestamp - hours_back_milliseconds
df$num_stops_within_past_5_hours<- 0

I would like to get a dataframe with rolling counts for the number of observations with a status of "stopped" for each row. To do this in R, I just made a couple nested loops, i.e., ran this:

for(i in 1:length(unique(df$user_id))){
  the_user<- unique(df$user_id)[i]
  filtered_data<- df[which(df$user_id == the_user),]

  for(j in 1:nrow(filtered_data)){
    the_row_id<- filtered_data$row_id[j]
    the_time<- filtered_data$eventTimestamp[j]
    the_past_time<- filtered_data$hours_back_timestamp[j]
    num_stops_in_past_interval<- base::nrow(filtered_data[filtered_data$eventTimestamp >= the_past_time & filtered_data$eventTimestamp < the_time & filtered_data$status == "stopped",])
    df$num_stops_within_past_5_hours[which(df$row_id==the_row_id)]<- num_stops_in_past_interval
  }
}
View(df)

I am trying to do the same thing, but either by using the built in functions in SparkR or (I think more likely) an SQL statement. I am wondering if anyone knows how I could reproduce the output from the df, but inside a Spark context? Any help is much appreciated. Thank you in advance. --Nate

Start with this data:

sdf<- SparkR::createDataFrame(df[, c("date", "eventTimestamp", "status", "user_id", "row_id")])

Solution

  • This solution works for the sample data as you have it set up, but isn't a more general solution for observations with any arbitrary timestamp.

    ddf <- as.DataFrame(df)
    ddf$count <- ifelse(ddf$status == "stopped", 1, 0)
    
    # Create a windowSpec partitioning by user_id and ordered by date
    ws <- orderBy(windowPartitionBy("user_id"), "date")
    
    # Get the cumulative sum of the count variable by user id
    ddf$count <- over(sum(ddf$count), ws)
    
    # Get the lagged value of the cumulative sum from 5hrs ago
    ddf$lag_count <- over(lag(ddf$count, offset = 5, default = 0), ws)
    
    # The count of stops in the last 5hrs is the difference between the two
    ddf$num_stops_within_past_5_hours <- ddf$count - ddf$lag_count
    

    Edited to add a more general solution that can handle inconsistent time breaks

    # Using a sampled version of the original df to create inconsistent 
    time breaks
    ddf <- as.DataFrame(df[base::sample(nrow(df), nrow(df) - 20), ])
    ddf$count <- ifelse(ddf$status == "stopped", 1, 0)
    
    to_join <- ddf %>% select("count", "eventTimestamp", "user_id") %>% rename(eventTimestamp_ = .$eventTimestamp, user_id_ = .$user_id)
    
    ddf$count <- NULL
    
    # join in each row where the event timestamp is within the interval
    ddf_new <- join(ddf, to_join, ddf$hours_back_timestamp <= to_join$eventTimestamp_ & ddf$eventTimestamp >= to_join$eventTimestamp_ & ddf$user_id == to_join$user_id_, joinType = "left")
    
    ddf_new <- ddf_new %>% groupBy(
       'date', 
       'eventTimestamp',
       'user_id', 
       'status', 
       'row_id', 
       'hours_back_timestamp') %>% 
       agg(num_stops_within_past_5_hours = sum(ddf_new$count))