Search code examples
rapache-sparksparkr

How do I group by hour in SparkR?


I am trying to summarize some dates by their hour, using SparkR and Spark 2.1.0. My data looks like:

                       created_at
1  Sun Jul 31 22:25:01 +0000 2016
2  Sun Jul 31 22:25:01 +0000 2016
3  Fri Jun 03 10:16:57 +0000 2016
4  Mon May 30 19:23:55 +0000 2016
5  Sat Jun 11 21:00:07 +0000 2016
6  Tue Jul 12 16:31:46 +0000 2016
7  Sun May 29 19:12:26 +0000 2016
8  Sat Aug 06 11:04:29 +0000 2016
9  Sat Aug 06 11:04:29 +0000 2016
10 Sat Aug 06 11:04:29 +0000 2016

and I want the output to be:

Hour      Count
22         2
10         1
19         1
11         3
....

I tried:

sumdf <- summarize(groupBy(df, df$created_at), count = n(df$created_at))
head(select(sumdf, "created_at", "count"),10)

but that groups to the nearest second:

                       created_at count
1  Sun Jun 12 10:24:54 +0000 2016     1
2  Tue Aug 09 14:12:35 +0000 2016     2
3  Fri Jul 29 19:22:03 +0000 2016     2
4  Mon Jul 25 21:05:05 +0000 2016     2

I tried:

sumdf <- summarize(groupBy(df, hr=hour(df$created_at)), count = n(hour(df$created_at)))
head(select(sumdf, "hour(created_at)", "count"),20)

but that gives:

  hour(created_at) count
1               NA     0

I tried:

sumdf <- summarize(groupBy(df, df$created_at), count = n(hour(df$created_at)))
head(select(sumdf, "created_at", "count"),10)

but that gives:

                       created_at count
1  Sun Jun 12 10:24:54 +0000 2016     0
2  Tue Aug 09 14:12:35 +0000 2016     0
3  Fri Jul 29 19:22:03 +0000 2016     0
4  Mon Jul 25 21:05:05 +0000 2016     0
...

How can I use the hour function to achieve this, or is there a better way?


Solution

  • Assuming your local table is df, the real problem here is to extract the hour out of your created_at column and then use your grouping code. To do this, you can use dapply:

    library(SparkR)
    sc1 <- sparkR.session()
    df2 <- createDataFrame(df)
    
    #with dapply you need to specify the schema i.e. the data.frame that will come out
    #of the applied function - i.e. substringDF in our case
    schema <- structType(structField('created_at', 'string'), structField('time', 'string'))
    
    #a function that will be applied to each partition of the spark data frame.
    #remember that each partition is a data.frame itself.
    substringDF <- function(DF) {
    
     DF$time <- substr(DF$created_at, 15, 16)
    
     DF
    
    }
    
    #and then we use the above in dapply
    df3 <- dapply(df2, substringDF, schema)
    head(df3)
    #                        created_at time
    #1 1  Sun Jul 31 22:25:01 +0000 2016   22
    #2 2  Sun Jul 31 22:25:01 +0000 2016   22
    #3 3  Fri Jun 03 10:16:57 +0000 2016   10
    #4 4  Mon May 30 19:23:55 +0000 2016   19
    #5 5  Sat Jun 11 21:00:07 +0000 2016   21
    #6 6  Tue Jul 12 16:31:46 +0000 2016   16
    

    Then just apply your normal grouping code:

    sumdf <- summarize(groupBy(df3, df3$time), count = n(df3$time))
    head(select(sumdf, "time", "count"))
    #  time count
    #1   11     3
    #2   22     2
    #3   16     1
    #4   19     2
    #5   10     1
    #6   21     1