I have time-series data that I am summarizing by using run-length encoding with some additional summary statistics. The problem is that the data is a minimum of 40 million rows and I only have 16GB of RAM. At the moment I am having to perform the same thing on batches of the data and then appending the results together. The entire process is currently taking over a day. I know for loops are terrible but trying my current query all at once crashes my RStudio...
I am hoping someone can help to write my for loop into a function and then run it all using the parallel package in R. Or just optimize my original query???
The for loop is querying a subset of customers at a time so I'll do my best to create a reproducible example.
library(DBI)
library(dbplyr)
library(dplyr)
library(data.table)
customers <- data.frame(
customer.number = c(12345, 23456, 34567, 45678, 56789)
)
n <- 2
nr <- nrow(customers)
X <- split(customers, rep(1:ceiling(nr/n), each=n, length.out=nr))
consumption <- data.frame(
customer.number = c(12345, 12345, 12345,
23456, 23456, 23456,
34567, 34567, 34567,
45678, 45678, 45678,
56789, 56789, 56789),
consumption = c(1,2,3,
0,0,1,
1,0,1,
2,2,0,
0,0,0),
datetime = c("2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
"2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
"2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
"2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
"2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00")
)
beginning <- ymd_hms("2022-01-01 00:00:00")
ending <- ymd_hms("2022-02-01 00:00:00")
for(i in 1:length(X)){
rle <- tbl(connection, "consumption") %>%
select(customer.number, consumption, datetime) %>%
mutate(flag = if_else(consumption >= 1, TRUE, FALSE)) %>%
filter(customer.number %in% !!X[[i]]$customer.number,
datetime >= !!beginning,
datetime < !!ending) %>%
collect() %>%
arrange(customer.number, datetime) %>%
group_by(customer.number, Run = data.table::rleid(customer.number, flag), flag) %>%
summarize(Start = min(datetime),
End = max(datetime),
Length = length(Run),
Min.Consumption = min(consumption),
Avg.Consumption = mean(consumption),
Max.Consumption = max(consumption)) %>%
filter(flag != FALSE)
if(!exists("results")) {
results <- rle
} else if (exists("results")) {
results <- rbind(results, rle)
}
if(names(X)[i] == "1"){
results <- rle
} else {
results <- readRDS("results.rds") %>%
rbind(rle)
}
saveRDS(results, file = "results.rds")
remove(results, rle)
print(names(X)[i])
}
Here is a way to do the entire thing on the database. Note that there is no need for a loop, and the collect()
statement is at the end
tbl(connection, "consumption" ) %>%
mutate(flag = if_else(consumption>0,1,0)) %>%
filter(datetime >= !!beginning, datetime < !!ending) %>%
group_by(flag) %>%
window_order(customer_number, datetime) %>%
mutate(num2 = row_number()) %>%
ungroup() %>%
mutate(Run = row_number()-num2) %>%
select(-num2) %>%
group_by(customer_number, Run, flag) %>%
summarize(Start = min(datetime),
End = max(datetime),
Length = count(Run),
Min.Consumption = min(consumption),
Avg.Consumption = mean(consumption),
Max.Consumption = max(consumption), .groups="drop") %>%
filter(flag==1) %>%
collect()
Output:
customer_number Run flag Start End Length Min.Consumption Avg.Consumption Max.Consumption
<chr> <int64> <dbl> <dttm> <dttm> <int> <int> <int> <int>
1 12345 0 1 2022-01-01 00:00:00 2022-01-01 02:00:00 3 1 2 3
2 23456 2 1 2022-01-01 02:00:00 2022-01-01 02:00:00 1 1 1 1
3 34567 2 1 2022-01-01 00:00:00 2022-01-01 00:00:00 1 1 1 1
4 34567 3 1 2022-01-01 02:00:00 2022-01-01 02:00:00 1 1 1 1
5 45678 3 1 2022-01-01 00:00:00 2022-01-01 01:00:00 2 2 2 2