I am working with a large set of datasets containing time-series. My time-series data include ID and a value for each day for several years (about 90Gb in total).
What I am trying to do is to merge (Non-equi join) the time-series with a small dataset (3 million rows) by an ID and within an interval of dates and then perform an average based on a variable called group
.
Here, I created a reproducible example using a very small toy dataset (adapted from this tutorial).
First, I create the data:
#LOAD PACKAGES
library(data.table)
library(arrow)
library(duckdb)
library(dplyr)
##############################################################
set.seed(123)
#CREATE START AND END DATASET
fenway <- fread(
"Visitor, Start, End
Cleveland, 2021-09-03, 2021-09-05
Cleveland, 2021-10-07, 2021-10-09
Tampa Bay, 2021-09-06, 2021-09-08
Baltimore, 2021-09-17, 2021-09-19
NY Mets, 2021-09-21, 2021-09-22
NY Mets, 2021-10-23, 2021-10-25
NY Yankees, 2021-09-24, 2021-09-26")
#SIMULATE TIME-SERIES FOR THE MONTHS OF SEPTEMBER AND OCTOBER
september <- data.table(Visitor = rep(c("Cleveland", "Tampa Bay", "Baltimore",
"NY Mets", "NY Yankees"),each=30),
date = rep(seq(as.Date('2021-09-01'),
as.Date('2021-09-30'), 'days'),5),
rain = rnorm(150))
october <- data.table(Visitor = rep(c("Cleveland", "Tampa Bay", "Baltimore",
"NY Mets", "NY Yankees"),each=30),
date = rep(seq(as.Date('2021-10-01'),
as.Date('2021-10-30'), 'days'),5),
rain = rnorm(150))
#SAVE THE DATA (THE TIME-SERIES ARE SAVED IN A SUBFOLDER)
fwrite(fenway, "fenway.csv")
fwrite(september, "new_exp/2021_09.csv")
fwrite(october, "new_exp/2021_10.csv")
#####################################################################
This the query for arrow (and duckdb):
##Open rain datasets (exp) and load fenway dataset
exp <- open_dataset("new_exp/", format="csv")
fenway<- fread("fenway.csv") |> as_arrow_table()
#Description of query
#STEP 1: left_join and filter mimic a non-equi join as
#the function join_by() is not implemented
#STEP 2: keep only columns I need
#STEP 3: create a column called "group" (THE ISSUE IS HERE)
#STEP 4: group by Visitor, End and group
#STEP 5: compute average of rain by group
#My query
avefenway <- exp |>
left_join(fenway, by="Visitor") |> #STEP 1
filter(date >= Start, date <= End) |> #STEP 1
select(Visitor, date, End, rain) |> #STEP 2
mutate(group = floor((End-date)/2)) |> #STEP 3
group_by(Visitor,End,group) |> #STEP 4
summarise(averain = mean(rain, na.rm=T), .groups = "keep")|> #STEP 5
collect()
#
# Error in `map()`:
# ℹ In index: 5.
# ℹ With name: group.
# Caused by error:
# ! NotImplemented: Unsupported cast from duration[s] to double using function cast_double
I understand that the error above is due unimplemented casting from duration (or difftime) to a double data type. Then, I tried a workaround using duckdb
:
avefenway <- exp |>
left_join(fenway, by="Visitor") |> #STEP 1
filter(date >= Start, date <= End) |> #STEP 1
select(Visitor, date, End, rain) |> #STEP 2
to_duckdb() |> #NEW ROW
mutate(group = floor((End-date)/2)) |> #STEP 3
group_by(Visitor,End,group) |> #STEP 4
summarise(averain = mean(rain, na.rm=T), .groups = "keep")|> #STEP 5
collect()
#IT WORKS
Output:
# A tibble: 13 × 4
# Groups: Visitor, End, group [13]
Visitor End group averain
<chr> <date> <dbl> <dbl>
1 Cleveland 2021-09-05 1 1.56
2 Cleveland 2021-09-05 0 0.0999
3 Tampa Bay 2021-09-08 1 0.689
4 Tampa Bay 2021-09-08 0 0.246
5 Baltimore 2021-09-19 1 -0.285
6 Baltimore 2021-09-19 0 -0.520
7 NY Mets 2021-09-22 0 0.0163
8 NY Yankees 2021-09-26 1 -1.51
9 NY Yankees 2021-09-26 0 -1.07
10 Cleveland 2021-10-09 1 0.563
11 Cleveland 2021-10-09 0 0.302
12 NY Mets 2021-10-25 1 0.0472
13 NY Mets 2021-10-25 0 1.80
PROBLEM: Converting the arrow object to a duckdb one works perfectly for this example, but it is unfeasible for very large ones because, for now (see this thread), duckdb allows only in-memory aggregation operations which might not be enough for the memory. So, my desktop crashes all the time because the RAM is not enough! I am wondering if there could be another workaround using only arrow without duckdb.
Does anyone have a hint or recommend a possible solution to this? I am very inexperienced with arrow
but it seems a extremely promising tool and I am trying to learn as much as possible on how to get the best out of it for my daily data management/analysis.
Thanks in advance.
We can use as.integer
inline to convert dates to numbers, which allows the grouping to commence.
library(arrow)
library(dplyr)
fenway <- open_csv_dataset("fenway.csv")
months <- open_csv_dataset("new_exp/")
left_join(months, fenway, by = "Visitor") |>
filter(between(date, Start, End)) |>
select(Visitor, date, End, rain) |>
mutate(group = floor((as.integer(End) - as.integer(date))/2)) |>
group_by(Visitor, End, group) |>
summarize(averain = mean(rain, na.rm = TRUE), .groups = "keep") |>
collect()
# # A tibble: 13 × 4
# # Groups: Visitor, End, group [13]
# Visitor End group averain
# <chr> <date> <dbl> <dbl>
# 1 Cleveland 2021-09-05 1 1.56
# 2 Cleveland 2021-09-05 0 0.0999
# 3 Tampa Bay 2021-09-08 1 0.689
# 4 Tampa Bay 2021-09-08 0 0.246
# 5 Baltimore 2021-09-19 1 -0.285
# 6 Baltimore 2021-09-19 0 -0.520
# 7 NY Mets 2021-09-22 0 0.0163
# 8 NY Yankees 2021-09-26 1 -1.51
# 9 NY Yankees 2021-09-26 0 -1.07
# 10 Cleveland 2021-10-09 1 0.563
# 11 Cleveland 2021-10-09 0 0.302
# 12 NY Mets 2021-10-25 1 0.0472
# 13 NY Mets 2021-10-25 0 1.80
I'm unaware why as.numeric
does not work directly, though if you feel you really need doubles there, you can do as.numeric(as.integer(End))
.