Search code examples
rapache-sparkapache-spark-sqlsparkr

How to filter a SparkR DataFrame


In SparkR I have a DataFrame data that contains user, act and the time for each act. act contains numbers from 1 to 9, meaning we have 9 acts.

head(data)

then gives

user  act  time
21      1  2012-01-05
14      8  2013-05-04
21      1  2013-01-04
84      4  2012-02-02

For each user I want to get all act from the first 60 days.

For example user 21 filter(data, data$user==21)

have these acts

user   act   time
21     1     2012-01-05
21     1     2013-01-04
21     7     2013-01-05

Here I only want the first act since the other two act's are more than 60 days old.

I can find the birth of each user (the first time an act appears) by this code

userbirth <- groupBy(data, data$user) %>% agg(min(data$time))

but I can't find out how to get a new dataset only containing act for the first 60 days for each user.

I try solve the problem this way

g <- groupBy(data, data$user)
result <- agg(g, data$time < min(data$time) +60 )

But R gives me an error-message: "returnstatus==0 is not True." How can I solve this?


Solution

  • You were close with userbirth; you just need to include this new min(time) column to your initial DF using join.

    Here is a fully reproducible example, adding some more records to the ones you show in order to get a clear demonstration:

    library(magrittr)
    
    user <- c(21, 14, 21, 84, 21, 21, 14, 14)
    act <- c(1, 8, 1, 4, 7, 9, 1, 3)
    time <- c("2012-01-05", "2013-05-04", "2013-01-04", "2012-02-02", "2013-01-05", "2012-02-10", "2013-05-20", "2013-07-10")
    
    df_local <- data.frame(user, act, time)
    df_local
    #   user act       time
    # 1   21   1 2012-01-05
    # 2   14   8 2013-05-04
    # 3   21   1 2013-01-04
    # 4   84   4 2012-02-02
    # 5   21   7 2013-01-05
    # 6   21   9 2012-02-10
    # 7   14   1 2013-05-20
    # 8   14   3 2013-07-10
    
    df <- createDataFrame(sqlContext, df_local)
    
    df$time <- to_date(df$time)
    df$user <- cast(df$user, "integer")
    df$act <- cast(df$act, "integer")
    df
    # DataFrame[user:int, act:int, time:date]
    
    userbirth <- groupBy(df, df$user) %>% agg(min(df$time))
    names(userbirth) <- c("user_", "min_time")  # works, although undocumented!
    userbirth
    # DataFrame[user_:int, min_time:date]
    showDF(userbirth)
    # +-----+----------+
    # |user_|  min_time|
    # +-----+----------+
    # |   84|2012-02-02|
    # |   14|2013-05-04|
    # |   21|2012-01-05|
    # +-----+----------+    
    
    df2 <- join(df, userbirth, df$user == userbirth$user_) 
    showDF(df2)
    # +----+---+----------+-----+----------+
    # |user|act|      time|user_|  min_time|
    # +----+---+----------+-----+----------+
    # |  84|  4|2012-02-02|   84|2012-02-02|
    # |  14|  8|2013-05-04|   14|2013-05-04|
    # |  14|  1|2013-05-20|   14|2013-05-04|
    # |  14|  3|2013-07-10|   14|2013-05-04|
    # |  21|  1|2012-01-05|   21|2012-01-05|
    # |  21|  1|2013-01-04|   21|2012-01-05|
    # |  21|  7|2013-01-05|   21|2012-01-05|
    # |  21|  9|2012-02-10|   21|2012-01-05|
    # +----+---+----------+-----+----------+
    

    Before proceeding, let's check what the expected result should be based on df2 data above:

    • The unique record of user 84
    • The two records of May 2013 for user 14
    • The two records of 2012 for user 21

    Let's see (we make use of SparkR date_add function):

    df3 <- filter(df2, df2$time <= date_add(df2$min_time, 60))
    showDF(df3)
    # +----+---+----------+-----+----------+
    # |user|act|      time|user_|  min_time|
    # +----+---+----------+-----+----------+
    # |  84|  4|2012-02-02|   84|2012-02-02|
    # |  14|  8|2013-05-04|   14|2013-05-04|
    # |  14|  1|2013-05-20|   14|2013-05-04|
    # |  21|  1|2012-01-05|   21|2012-01-05|
    # |  21|  9|2012-02-10|   21|2012-01-05|
    # +----+---+----------+-----+----------+
    

    From this point, we can keep selected columns only, more or less as we would do in an ordinary R dataframe:

    df4 <- df3[,c("user", "act","time")]
    showDF(df4)
    # +----+---+----------+
    # |user|act|      time|
    # +----+---+----------+
    # |  84|  4|2012-02-02|
    # |  14|  8|2013-05-04|
    # |  14|  1|2013-05-20|
    # |  21|  1|2012-01-05|
    # |  21|  9|2012-02-10|
    # +----+---+----------+
    

    Notice that, after the creation of our Spark dataframe df, all operations are SparkR ones (and not "local" R):

    class(df4)
    # [1] "DataFrame"
    # attr(,"package")
    # [1] "SparkR"
    df4
    # DataFrame[user:int, act:int, time:date]
    

    Feel free to come back for any clarifications you may require...

    sessionInfo()
    # R version 3.2.2 (2015-08-14)
    # Platform: i686-pc-linux-gnu (32-bit)
    # Running under: Ubuntu 14.04.2 LTS
    # [...]
    # other attached packages:
    # [1] magrittr_1.5 SparkR_1.5.1