In SparkR I have a DataFrame data
that contains user
, act
and the time
for each act. act
contains numbers from 1 to 9, meaning we have 9 acts.
head(data)
then gives
user act time
21 1 2012-01-05
14 8 2013-05-04
21 1 2013-01-04
84 4 2012-02-02
For each user
I want to get all act
from the first 60 days.
For example user 21 filter(data, data$user==21)
have these acts
user act time
21 1 2012-01-05
21 1 2013-01-04
21 7 2013-01-05
Here I only want the first act since the other two act's are more than 60 days old.
I can find the birth of each user (the first time an act
appears) by this code
userbirth <- groupBy(data, data$user) %>% agg(min(data$time))
but I can't find out how to get a new dataset only containing act
for the first 60 days for each user
.
I try solve the problem this way
g <- groupBy(data, data$user)
result <- agg(g, data$time < min(data$time) +60 )
But R gives me an error-message: "returnstatus==0 is not True." How can I solve this?
You were close with userbirth
; you just need to include this new min(time)
column to your initial DF using join
.
Here is a fully reproducible example, adding some more records to the ones you show in order to get a clear demonstration:
library(magrittr)
user <- c(21, 14, 21, 84, 21, 21, 14, 14)
act <- c(1, 8, 1, 4, 7, 9, 1, 3)
time <- c("2012-01-05", "2013-05-04", "2013-01-04", "2012-02-02", "2013-01-05", "2012-02-10", "2013-05-20", "2013-07-10")
df_local <- data.frame(user, act, time)
df_local
# user act time
# 1 21 1 2012-01-05
# 2 14 8 2013-05-04
# 3 21 1 2013-01-04
# 4 84 4 2012-02-02
# 5 21 7 2013-01-05
# 6 21 9 2012-02-10
# 7 14 1 2013-05-20
# 8 14 3 2013-07-10
df <- createDataFrame(sqlContext, df_local)
df$time <- to_date(df$time)
df$user <- cast(df$user, "integer")
df$act <- cast(df$act, "integer")
df
# DataFrame[user:int, act:int, time:date]
userbirth <- groupBy(df, df$user) %>% agg(min(df$time))
names(userbirth) <- c("user_", "min_time") # works, although undocumented!
userbirth
# DataFrame[user_:int, min_time:date]
showDF(userbirth)
# +-----+----------+
# |user_| min_time|
# +-----+----------+
# | 84|2012-02-02|
# | 14|2013-05-04|
# | 21|2012-01-05|
# +-----+----------+
df2 <- join(df, userbirth, df$user == userbirth$user_)
showDF(df2)
# +----+---+----------+-----+----------+
# |user|act| time|user_| min_time|
# +----+---+----------+-----+----------+
# | 84| 4|2012-02-02| 84|2012-02-02|
# | 14| 8|2013-05-04| 14|2013-05-04|
# | 14| 1|2013-05-20| 14|2013-05-04|
# | 14| 3|2013-07-10| 14|2013-05-04|
# | 21| 1|2012-01-05| 21|2012-01-05|
# | 21| 1|2013-01-04| 21|2012-01-05|
# | 21| 7|2013-01-05| 21|2012-01-05|
# | 21| 9|2012-02-10| 21|2012-01-05|
# +----+---+----------+-----+----------+
Before proceeding, let's check what the expected result should be based on df2
data above:
84
14
21
Let's see (we make use of SparkR date_add
function):
df3 <- filter(df2, df2$time <= date_add(df2$min_time, 60))
showDF(df3)
# +----+---+----------+-----+----------+
# |user|act| time|user_| min_time|
# +----+---+----------+-----+----------+
# | 84| 4|2012-02-02| 84|2012-02-02|
# | 14| 8|2013-05-04| 14|2013-05-04|
# | 14| 1|2013-05-20| 14|2013-05-04|
# | 21| 1|2012-01-05| 21|2012-01-05|
# | 21| 9|2012-02-10| 21|2012-01-05|
# +----+---+----------+-----+----------+
From this point, we can keep selected columns only, more or less as we would do in an ordinary R dataframe:
df4 <- df3[,c("user", "act","time")]
showDF(df4)
# +----+---+----------+
# |user|act| time|
# +----+---+----------+
# | 84| 4|2012-02-02|
# | 14| 8|2013-05-04|
# | 14| 1|2013-05-20|
# | 21| 1|2012-01-05|
# | 21| 9|2012-02-10|
# +----+---+----------+
Notice that, after the creation of our Spark dataframe df
, all operations are SparkR ones (and not "local" R):
class(df4)
# [1] "DataFrame"
# attr(,"package")
# [1] "SparkR"
df4
# DataFrame[user:int, act:int, time:date]
Feel free to come back for any clarifications you may require...
sessionInfo()
# R version 3.2.2 (2015-08-14)
# Platform: i686-pc-linux-gnu (32-bit)
# Running under: Ubuntu 14.04.2 LTS
# [...]
# other attached packages:
# [1] magrittr_1.5 SparkR_1.5.1