Search code examples
rapache-arrow

Apply modulo %% operation to a Arrow Dataset column and Filter by result


When working with large time series data, I often need to down-sample. When working with time index stored as numeric or integer epoch time, it is very simple to sample by minute or day with time %% 60 == 0L and time %% 86400 == 0L, and with some multiplication and rounding, it's easy to extract data at sub-second sample rates as well.

This is a very contrived example, using the built in beaver1 dataset, I might use an expression like the following base data.frame operation to filter to only measurements that were captured at the top of each hour. This is a very contrived answer,

beaver1[beaver1$time %% 100 == 0,] |> head
#   day time  temp activ
# 3  346  900 36.35     0
# 9  346 1000 36.81     0
# 15 346 1100 36.89     0
# 21 346 1200 36.78     0
# 27 346 1300 36.89     0
# 33 346 1400 36.77     0

I would like to do a similar operation on an arrow::Dataset object. However, when I attempt to use the modulo operator, I get the error: Error: !is.null(schema) is not TRUE.

## Write a toy dataset
arrow::write_csv_arrow(beaver1,"beaver1.csv")

DS <- arrow::open_dataset(sources = c("beaver1.csv"),format = "csv")

DS$NewScan()$UseThreads()$Project(
  list(time2 = Expression$field_ref("time") %% 100L,
       day = Expression$field_ref("day"),
       time = Expression$field_ref("time"),
       temp = Expression$field_ref("temp"),
       activ = Expression$field_ref("activ"))
  )$Filter(Expression$field_ref("time2") == 0L)$Finish()$ToTable()|> 
  as.data.frame() |> 
  head()

# Error: !is.null(schema) is not TRUE

What is the proper syntax for applying modulo operations with an Arrow::Dataset?

Can I use new columns defined by a Project() operation within a subsequent Filter() operation?


Solution

  • The modulo function has been implemented in the Arrow package's dplyr bindings, and so you could do something like:

    library(dplyr)
    tf <- tempfile()
    dir.create(tf)
    arrow::write_dataset(beaver1, tf, format = "csv")
    arrow::open_dataset(tf, format = "csv") %>%
      filter(time %% 100L == 0L) %>%
      collect()
    #> # A tibble: 19 × 4
    #>      day  time  temp activ
    #>    <int> <int> <dbl> <int>
    #>  1   346   900  36.4     0
    #>  2   346  1000  36.8     0
    #>  3   346  1100  36.9     0
    #>  4   346  1200  36.8     0
    #>  5   346  1300  36.9     0
    #>  6   346  1400  36.8     0
    #>  7   346  1500  36.7     0
    #>  8   346  1600  36.8     0
    #>  9   346  1700  37.0     0
    #> 10   346  1800  37.0     0
    #> 11   346  1900  37.0     0
    #> 12   346  2000  37.1     0
    #> 13   346  2100  36.8     0
    #> 14   346  2200  37.2     0
    #> 15   346  2300  37.2     1
    #> 16   347     0  36.9     0
    #> 17   347   100  36.8     0
    #> 18   347   200  36.9     0
    #> 19   347   300  36.9     0