Search code examples
rout-of-memoryapache-arrow

arrow R duration/difftime casting to float


I am working with a large set of datasets containing time-series. My time-series data include ID and a value for each day for several years (about 90Gb in total). What I am trying to do is to merge (Non-equi join) the time-series with a small dataset (3 million rows) by an ID and within an interval of dates and then perform an average based on a variable called group.

Here, I created a reproducible example using a very small toy dataset (adapted from this tutorial).

First, I create the data:

#LOAD PACKAGES
library(data.table)
library(arrow)
library(duckdb)
library(dplyr)

##############################################################
set.seed(123)

#CREATE START AND END DATASET
fenway <- fread(
  "Visitor, Start, End
    Cleveland, 2021-09-03, 2021-09-05
    Cleveland, 2021-10-07, 2021-10-09
    Tampa Bay, 2021-09-06, 2021-09-08
    Baltimore, 2021-09-17, 2021-09-19
    NY Mets, 2021-09-21, 2021-09-22
    NY Mets, 2021-10-23, 2021-10-25
    NY Yankees, 2021-09-24, 2021-09-26")

#SIMULATE TIME-SERIES FOR THE MONTHS OF SEPTEMBER AND OCTOBER
september <- data.table(Visitor = rep(c("Cleveland", "Tampa Bay", "Baltimore",
                             "NY Mets", "NY Yankees"),each=30),
             date = rep(seq(as.Date('2021-09-01'), 
                            as.Date('2021-09-30'), 'days'),5),
             rain = rnorm(150))
october <- data.table(Visitor = rep(c("Cleveland", "Tampa Bay", "Baltimore",
                             "NY Mets", "NY Yankees"),each=30),
             date = rep(seq(as.Date('2021-10-01'), 
                            as.Date('2021-10-30'), 'days'),5),
             rain = rnorm(150))
#SAVE THE DATA (THE TIME-SERIES ARE SAVED IN A SUBFOLDER)
fwrite(fenway, "fenway.csv")
fwrite(september, "new_exp/2021_09.csv")
fwrite(october, "new_exp/2021_10.csv")
#####################################################################

This the query for arrow (and duckdb):

##Open rain datasets (exp) and load fenway dataset
exp <- open_dataset("new_exp/", format="csv")
fenway<- fread("fenway.csv") |> as_arrow_table()

#Description of query
#STEP 1: left_join and filter mimic a non-equi join as 
#the function join_by() is not implemented
#STEP 2: keep only columns I need
#STEP 3: create a column called "group" (THE ISSUE IS HERE)
#STEP 4: group by Visitor, End and group
#STEP 5: compute average of rain by group

#My query
avefenway <- exp |> 
  left_join(fenway, by="Visitor") |> #STEP 1
  filter(date >= Start, date <= End) |> #STEP 1
  select(Visitor, date, End, rain) |> #STEP 2
  mutate(group = floor((End-date)/2)) |> #STEP 3
  group_by(Visitor,End,group) |> #STEP 4
  summarise(averain = mean(rain, na.rm=T), .groups = "keep")|> #STEP 5
  collect()
# 
# Error in `map()`:
# ℹ In index: 5.
# ℹ With name: group.
# Caused by error:
# ! NotImplemented: Unsupported cast from duration[s] to double using function cast_double

I understand that the error above is due unimplemented casting from duration (or difftime) to a double data type. Then, I tried a workaround using duckdb:

  avefenway <- exp |> 
      left_join(fenway, by="Visitor") |> #STEP 1
      filter(date >= Start, date <= End) |> #STEP 1
      select(Visitor, date, End, rain) |> #STEP 2
      to_duckdb() |> #NEW ROW
      mutate(group = floor((End-date)/2)) |> #STEP 3
      group_by(Visitor,End,group) |> #STEP 4
      summarise(averain = mean(rain, na.rm=T), .groups = "keep")|> #STEP 5
      collect()
#IT WORKS

Output:

# A tibble: 13 × 4
# Groups:   Visitor, End, group [13]
   Visitor    End        group averain
   <chr>      <date>     <dbl>   <dbl>
 1 Cleveland  2021-09-05     1  1.56  
 2 Cleveland  2021-09-05     0  0.0999
 3 Tampa Bay  2021-09-08     1  0.689 
 4 Tampa Bay  2021-09-08     0  0.246 
 5 Baltimore  2021-09-19     1 -0.285 
 6 Baltimore  2021-09-19     0 -0.520 
 7 NY Mets    2021-09-22     0  0.0163
 8 NY Yankees 2021-09-26     1 -1.51  
 9 NY Yankees 2021-09-26     0 -1.07  
10 Cleveland  2021-10-09     1  0.563 
11 Cleveland  2021-10-09     0  0.302 
12 NY Mets    2021-10-25     1  0.0472
13 NY Mets    2021-10-25     0  1.80 

PROBLEM: Converting the arrow object to a duckdb one works perfectly for this example, but it is unfeasible for very large ones because, for now (see this thread), duckdb allows only in-memory aggregation operations which might not be enough for the memory. So, my desktop crashes all the time because the RAM is not enough! I am wondering if there could be another workaround using only arrow without duckdb.

Does anyone have a hint or recommend a possible solution to this? I am very inexperienced with arrow but it seems a extremely promising tool and I am trying to learn as much as possible on how to get the best out of it for my daily data management/analysis.

Thanks in advance.


Solution

  • We can use as.integer inline to convert dates to numbers, which allows the grouping to commence.

    library(arrow)
    library(dplyr)
    
    fenway <- open_csv_dataset("fenway.csv")
    months <- open_csv_dataset("new_exp/")
    
    left_join(months, fenway, by = "Visitor") |>
      filter(between(date, Start, End)) |>
      select(Visitor, date, End, rain) |>
      mutate(group = floor((as.integer(End) - as.integer(date))/2)) |>
      group_by(Visitor, End, group) |>
      summarize(averain = mean(rain, na.rm = TRUE), .groups = "keep") |>
      collect()
    # # A tibble: 13 × 4
    # # Groups:   Visitor, End, group [13]
    #    Visitor    End        group averain
    #    <chr>      <date>     <dbl>   <dbl>
    #  1 Cleveland  2021-09-05     1  1.56  
    #  2 Cleveland  2021-09-05     0  0.0999
    #  3 Tampa Bay  2021-09-08     1  0.689 
    #  4 Tampa Bay  2021-09-08     0  0.246 
    #  5 Baltimore  2021-09-19     1 -0.285 
    #  6 Baltimore  2021-09-19     0 -0.520 
    #  7 NY Mets    2021-09-22     0  0.0163
    #  8 NY Yankees 2021-09-26     1 -1.51  
    #  9 NY Yankees 2021-09-26     0 -1.07  
    # 10 Cleveland  2021-10-09     1  0.563 
    # 11 Cleveland  2021-10-09     0  0.302 
    # 12 NY Mets    2021-10-25     1  0.0472
    # 13 NY Mets    2021-10-25     0  1.80  
    

    I'm unaware why as.numeric does not work directly, though if you feel you really need doubles there, you can do as.numeric(as.integer(End)).