Search code examples
rapache-spark-sqlsparklyr

Run length ID in sparklyr


data.table provides a rleid function which I find invaluable - it acts as a ticker when a watched variable(s) changes, ordered by some other variable(s).

library(dplyr)


tbl = tibble(time = as.integer(c(1, 2, 3, 4, 5, 6, 7, 8)), 
             var  = c("A", "A", "A", "B", "B", "A", "A", "A"))

> tbl
# A tibble: 8 × 2
   time   var
  <int> <chr>
1     1     A
2     2     A
3     3     A
4     4     B
5     5     B
6     6     A
7     7     A
8     8     A

Desired result is

> tbl %>% mutate(rleid = data.table::rleid(var))
# A tibble: 8 × 3
   time   var rleid
  <int> <chr> <int>
1     1     A     1
2     2     A     1
3     3     A     1
4     4     B     2
5     5     B     2
6     6     A     3
7     7     A     3
8     8     A     3

I was wondering if I could reproduce something similar using the tools provided by sparklyr. When testing, I found the best I could do was get to the point at which I needed to do a fill forward, but then couldn't achieve that.

library(sparklyr)

spark_install(version = "2.0.2")
sc <- spark_connect(master = "local", 
                    spark_home = spark_home_dir())


spk_tbl = copy_to(sc, tbl, overwrite = TRUE)

spk_tbl %>% 
  mutate(var2 = (var != lag(var, 1L, order = time))) %>%  # Thanks @JaimeCaffarel
  mutate(var3 = if(var2) { paste0(time, var) } else { NA })

Source:   query [8 x 4]
Database: spark connection master=local[4] app=sparklyr local=TRUE

   time   var  var2  var3
  <int> <chr> <lgl> <chr>
1     1     A  TRUE    1A
2     2     A FALSE  <NA>
3     3     A FALSE  <NA>
4     4     B  TRUE    4B
5     5     B FALSE  <NA>
6     6     A  TRUE    6A
7     7     A FALSE  <NA>
8     8     A FALSE  <NA>

I've tried using SparkR, however I much prefer the sparklyr interface and its ease of use, so I'd ideally be able to do this in Spark SQL.

I can of course, already do this by partitioning the data into small enough chunks, collecting it, running a function and sending it back.

For context, the reason I find the rleid to be useful is that I work with a lot of train data, and it's useful to be able to index what run it's on.

Thanks for any help Akhil


Solution

  • A working solution in sparklyr would be this:

    spk_tbl %>% 
      dplyr::arrange(time) %>% 
      dplyr::mutate(rleid = (var != lag(var, 1, order = time, default = FALSE))) %>% 
      dplyr::mutate(rleid = cumsum(as.numeric(rleid)))