I have a dataset with a structure similar to the df you get from this:
dates<- base::seq.POSIXt(from=as.POSIXlt(as.Date("2018-01-01"),
format="%Y-%m-%d"), to=as.POSIXlt(as.Date("2018-01-03"), format="%Y-%m-%d"), by = "hour")
possible_statuses<- c('moving', 'stopped')
statuses4demo<- base::sample(possible_statuses, size=98, replace = TRUE, prob = c(.75, .25))
hours_back<- 5
hours_back_milliseconds<- hours_back*3600 * 1000
# Generate dataframe
df<- data.frame(date=rep(dates,2), user_id=c(rep("user_1", 49), rep("user_2", 49)), status=statuses4demo)
df$row_id<- seq(from=1,to=nrow(df), by=1)
df$eventTimestamp<- as.numeric(format(df$date, "%s"))*1000
df$hours_back_timestamp<- df$eventTimestamp - hours_back_milliseconds
df$num_stops_within_past_5_hours<- 0
I would like to get a dataframe with rolling counts for the number of observations with a status of "stopped" for each row. To do this in R, I just made a couple nested loops, i.e., ran this:
for(i in 1:length(unique(df$user_id))){
the_user<- unique(df$user_id)[i]
filtered_data<- df[which(df$user_id == the_user),]
for(j in 1:nrow(filtered_data)){
the_row_id<- filtered_data$row_id[j]
the_time<- filtered_data$eventTimestamp[j]
the_past_time<- filtered_data$hours_back_timestamp[j]
num_stops_in_past_interval<- base::nrow(filtered_data[filtered_data$eventTimestamp >= the_past_time & filtered_data$eventTimestamp < the_time & filtered_data$status == "stopped",])
df$num_stops_within_past_5_hours[which(df$row_id==the_row_id)]<- num_stops_in_past_interval
}
}
View(df)
I am trying to do the same thing, but either by using the built in functions in SparkR or (I think more likely) an SQL statement. I am wondering if anyone knows how I could reproduce the output from the df, but inside a Spark context? Any help is much appreciated. Thank you in advance. --Nate
Start with this data:
sdf<- SparkR::createDataFrame(df[, c("date", "eventTimestamp", "status", "user_id", "row_id")])
This solution works for the sample data as you have it set up, but isn't a more general solution for observations with any arbitrary timestamp.
ddf <- as.DataFrame(df)
ddf$count <- ifelse(ddf$status == "stopped", 1, 0)
# Create a windowSpec partitioning by user_id and ordered by date
ws <- orderBy(windowPartitionBy("user_id"), "date")
# Get the cumulative sum of the count variable by user id
ddf$count <- over(sum(ddf$count), ws)
# Get the lagged value of the cumulative sum from 5hrs ago
ddf$lag_count <- over(lag(ddf$count, offset = 5, default = 0), ws)
# The count of stops in the last 5hrs is the difference between the two
ddf$num_stops_within_past_5_hours <- ddf$count - ddf$lag_count
Edited to add a more general solution that can handle inconsistent time breaks
# Using a sampled version of the original df to create inconsistent
time breaks
ddf <- as.DataFrame(df[base::sample(nrow(df), nrow(df) - 20), ])
ddf$count <- ifelse(ddf$status == "stopped", 1, 0)
to_join <- ddf %>% select("count", "eventTimestamp", "user_id") %>% rename(eventTimestamp_ = .$eventTimestamp, user_id_ = .$user_id)
ddf$count <- NULL
# join in each row where the event timestamp is within the interval
ddf_new <- join(ddf, to_join, ddf$hours_back_timestamp <= to_join$eventTimestamp_ & ddf$eventTimestamp >= to_join$eventTimestamp_ & ddf$user_id == to_join$user_id_, joinType = "left")
ddf_new <- ddf_new %>% groupBy(
'date',
'eventTimestamp',
'user_id',
'status',
'row_id',
'hours_back_timestamp') %>%
agg(num_stops_within_past_5_hours = sum(ddf_new$count))