I have 40+ CSV files with each being around 400MB. What I need to do is to read these 40+ big csv files, do some manipulation and formatting on them (such as commonize date formats, separating dates to months,day,etc..), and combine them in a single data frame. I have searched in the previous post about the quickest way to read these CSV files to be "fread" but even when I used fread, it took approx. 14 seconds for reading each file, and leaves me with a pretty significant runtime. I tried using SQLite through RSQLite for a single csv files:
setwd("raw_data/sqldatabase")
db <- dbConnect(SQLite(), dbname="test_db.sqlite") ## will make, if not present
dbWriteTable(conn=db, name="your_table", value="testdata.csv", row.names=FALSE, header=TRUE)
However, even using SQLite it took a considerable amount of time. What can be used to quickly read 40+ big csv folders into a "space" that makes manipulation on is very fast?
If I were to upload the data to a database once, and if it were to make the manipulation very fast from than on, I would be still fine, but the final folder (once merge is complete) expected to be 25+GB. So I am trying to find the most efficient way to manipulate the data
One alternative might be a "parquet datamart". The premise here:
.parquet
file.
.parquet
files after splitting on one or more indexable (categorical/ordinal) columns.arrow::open_dataset
, dplyr
, and their lazy-evaluation. While this does not allow you to read the entire dataset into memory at one time, it does give you access to all of the data in smaller chunks, as if it were one big dataset.This is still compatible with data.table
for the in-memory work, using arrow
to access the data lazily. While my example below uses data.table
, it is not required, and in fact introduces an extra step or two in order to convert the sample data from tibble
to data.table
. I suggest this due to the large nature of the data and because you tagged it, not because it is required.
Some things to note for both examples:
ds
reflects all 336,776 rows of data, though the object is rather small (just an environment with references to files and metadata in them).collect()
. Don't try to do this on the whole data unless you know that it can fit into memory.data.table
(from fread
), and write_parquet
keeps several of the frame's attributes (including that), so when we realize the data below, it will be a data.table
.collect
ed data is data.table
, changes to that collected data will not migrate back to the parquet file itself. This means if you do something like collect(ds)[, newcol := 1]
, doing another collect(ds)
will not have the newcol
in it. It's worth noting that parquet
files are immutable once written: they cannot be updated or appended-to.Practical example: nycflights13::flights
. The data contains 336,776 rows of "Airline on-time data for all flights departing NYC in 2013". For the sake of this example, I'll split the data randomly into 5 frames and save into CSV files.
set.seed(42)
ind <- sample(1:5, size = nrow(nycflights13::flights), replace = TRUE)
head(ind)
dir.create("csv")
i <- 1L
for (dat in split(nycflights13::flights, ind)) {
fwrite(dat, sprintf("csv/%s.csv", i))
i <- i + 1L
}
file.info(Sys.glob("csv/*"))
# size isdir mode mtime ctime atime exe
# csv/1.csv 6274623 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:54 no
# csv/2.csv 6265804 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:52 no
# csv/3.csv 6261533 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:43 no
# csv/4.csv 6260298 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:49 no
# csv/5.csv 6235815 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:46 no
For the two examples below, I'm not going to do any data filtering/formatting/augmentation; other than highlight where one would do this, I'll assume you know what you need to do to each individual CSV file before saving.
dir.create("datamart")
for (fn in Sys.glob("csv/*.csv")) {
X <- fread(fn)
arrow::write_parquet(X, file.path("datamart", paste0(basename(fn), ".parquet")))
rm(X)
gc() # optional, might help
}
file.info(Sys.glob("datamart/*"))
# size isdir mode mtime ctime atime exe
# datamart/1.csv.parquet 1251629 FALSE 666 2022-09-19 05:26:28 2022-09-19 05:26:28 2022-09-19 05:35:59 no
# datamart/2.csv.parquet 1249485 FALSE 666 2022-09-19 05:26:45 2022-09-19 05:26:45 2022-09-19 05:35:59 no
# datamart/3.csv.parquet 1249652 FALSE 666 2022-09-19 05:26:47 2022-09-19 05:26:47 2022-09-19 05:35:59 no
# datamart/4.csv.parquet 1249772 FALSE 666 2022-09-19 05:26:48 2022-09-19 05:26:48 2022-09-19 05:35:59 no
# datamart/5.csv.parquet 1245022 FALSE 666 2022-09-19 05:26:49 2022-09-19 05:26:49 2022-09-19 05:35:59 no
Reading in the data:
library(dplyr)
library(arrow)
ds <- open_dataset("datamart")
nrow(ds)
# [1] 336776
object.size(ds) # environment
# 504 bytes
with(ls.objects(envir = ds), sum(Size))
# [1] 145888
ds %>%
filter(month == 1, between(day, 1, 10))
# FileSystemDataset (query)
# year: int32
# month: int32
# day: int32
# dep_time: int32
# sched_dep_time: int32
# dep_delay: int32
# arr_time: int32
# sched_arr_time: int32
# arr_delay: int32
# carrier: string
# flight: int32
# tailnum: string
# origin: string
# dest: string
# air_time: int32
# distance: int32
# hour: int32
# minute: int32
# time_hour: timestamp[us, tz=UTC]
# * Filter: ((month == 1) and ((day >= 1) and (day <= 10)))
# See $.data for the source Arrow object
ds %>%
filter(month == 1, between(day, 1, 10)) %>%
collect()
# year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay carrier flight tailnum origin dest
# <int> <int> <int> <int> <int> <int> <int> <int> <int> <char> <int> <char> <char> <char>
# 1: 2013 1 1 554 600 -6 812 837 -25 DL 461 N668DN LGA ATL
# 2: 2013 1 1 555 600 -5 913 854 19 B6 507 N516JB EWR FLL
# 3: 2013 1 1 557 600 -3 709 723 -14 EV 5708 N829AS LGA IAD
# 4: 2013 1 1 558 600 -2 923 937 -14 UA 1124 N53441 EWR SFO
# 5: 2013 1 1 559 600 -1 941 910 31 AA 707 N3DUAA LGA DFW
# 6: 2013 1 1 607 607 0 858 915 -17 UA 1077 N53442 EWR MIA
# 7: 2013 1 1 613 610 3 925 921 4 B6 135 N635JB JFK RSW
# 8: 2013 1 1 615 615 0 833 842 -9 DL 575 N326NB EWR ATL
# 9: 2013 1 1 623 610 13 920 915 5 AA 1837 N3EMAA LGA MIA
# 10: 2013 1 1 624 630 -6 840 830 10 MQ 4599 N518MQ LGA MSP
# ---
# 8823: 2013 1 10 2038 2045 -7 2140 2154 -14 B6 1178 N640JB EWR BOS
# 8824: 2013 1 10 2040 2040 0 2351 2357 -6 B6 677 N809JB JFK LAX
# 8825: 2013 1 10 2054 2100 -6 2202 2207 -5 US 2144 N952UW LGA BOS
# 8826: 2013 1 10 2058 2100 -2 2229 2225 4 WN 530 N443WN LGA MDW
# 8827: 2013 1 10 2104 2110 -6 2337 2355 -18 B6 529 N507JB EWR MCO
# 8828: 2013 1 10 2129 2130 -1 148 218 -30 B6 701 N193JB JFK SJU
# 8829: 2013 1 10 2159 2159 0 2247 2300 -13 EV 4519 N13124 EWR BWI
# 8830: 2013 1 10 2320 2250 30 16 2354 22 B6 1018 N612JB JFK BOS
# 8831: 2013 1 10 NA 635 NA NA 940 NA AA 711 N3CDAA LGA DFW
# 8832: 2013 1 10 NA 700 NA NA 1007 NA UA 719 EWR DFW
# 5 variables not shown: [air_time <int>, distance <int>, hour <int>, minute <int>, time_hour <POSc>]
year
and month
as nested subdirectoriesIn your data, an indexable field might be:
There is a balance between convenience and usability: if a categorical variable has 20,000 possible values, then it might be too many and one will lose much efficiency. The more directories/files found within the subdirectories, the longer it will take to call open_dataset
before you can do something with it. I don't have easy metrics for this.
Note: one might be able to use write_dataset
, the counterpart to our open_dataset
above. It handles partitions=
in the same fashion. However, if you are not certain that each level within the partition fields are unique to a file (for instance, in my sample data I have month == 1
in all CSV files), then each CSV file read would overwrite some data from the previous write. In this case, as I'll demonstrate here, I'll write to subdirectories manually.
# ensures .parquet files are additive
addfile <- function(dat, base, by) {
thisdir <- do.call(file.path, as.list(c(base, paste(names(by), unname(by), sep = "="))))
dir.create(thisdir, recursive = TRUE, showWarnings = FALSE)
existing <- list.files(thisdir)
thisfile <- sprintf("%i.parquet", length(existing) + 1)
arrow::write_parquet(dat, file.path(thisdir, thisfile))
}
dir.create("datamart2")
for (fn in Sys.glob("csv/*.csv")) {
X <- fread(fn)
X[, addfile(.SD, "datamart2", by = .BY), by = .(year, month)]
rm(X)
gc() # optional
}
file.info(Sys.glob("datamart2/*/*/*"))
# size isdir mode mtime ctime atime exe
# datamart2/year=2013/month=1/1.parquet 133469 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=1/2.parquet 132760 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=1/3.parquet 134069 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=1/4.parquet 132404 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=1/5.parquet 136424 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=10/1.parquet 140490 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=10/2.parquet 139362 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=10/3.parquet 138570 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=10/4.parquet 137501 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=10/5.parquet 137426 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=11/1.parquet 133714 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=11/2.parquet 134291 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=11/3.parquet 133199 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=11/4.parquet 136152 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=11/5.parquet 133310 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=12/1.parquet 141743 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=12/2.parquet 142030 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=12/3.parquet 139573 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=12/4.parquet 140515 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=12/5.parquet 140059 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=2/1.parquet 126203 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=2/2.parquet 126481 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=2/3.parquet 126348 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=2/4.parquet 126618 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=2/5.parquet 123947 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=3/1.parquet 140691 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=3/2.parquet 142811 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=3/3.parquet 142415 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=3/4.parquet 140573 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=3/5.parquet 138510 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=4/1.parquet 140734 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=4/2.parquet 140707 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=4/3.parquet 140507 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=4/4.parquet 141896 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=4/5.parquet 141182 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=5/1.parquet 139517 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=5/2.parquet 140546 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=5/3.parquet 143193 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=5/4.parquet 139979 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=5/5.parquet 141259 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=6/1.parquet 143405 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=6/2.parquet 142591 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=6/3.parquet 142106 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=6/4.parquet 143012 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=6/5.parquet 141489 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=7/1.parquet 145064 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=7/2.parquet 143898 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=7/3.parquet 144104 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=7/4.parquet 146099 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=7/5.parquet 146616 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=8/1.parquet 145155 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=8/2.parquet 143314 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=8/3.parquet 145334 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=8/4.parquet 144581 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=8/5.parquet 145998 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=9/1.parquet 135902 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=9/2.parquet 135525 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=9/3.parquet 136012 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=9/4.parquet 137506 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=9/5.parquet 133894 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
Reading data is the same as in part 1, though note that the indexes year
and month
are the last two columns instead of the first two:
library(dplyr)
library(arrow)
ds <- open_dataset("datamart2")
nrow(ds)
# [1] 336776
object.size(ds) # environment
# 504 bytes
with(ls.objects(envir = ds), sum(Size))
# [1] 155896
ds %>%
filter(month == 1, between(day, 1, 10))
# FileSystemDataset (query)
# day: int32
# dep_time: int32
# sched_dep_time: int32
# dep_delay: int32
# arr_time: int32
# sched_arr_time: int32
# arr_delay: int32
# carrier: string
# flight: int32
# tailnum: string
# origin: string
# dest: string
# air_time: int32
# distance: int32
# hour: int32
# minute: int32
# time_hour: timestamp[us, tz=UTC]
# year: int32
# month: int32
# * Filter: ((month == 1) and ((day >= 1) and (day <= 10)))
# See $.data for the source Arrow object
ds %>%
filter(month == 1, between(day, 1, 10)) %>%
collect()
# day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay carrier flight tailnum origin dest air_time
# <int> <int> <int> <int> <int> <int> <int> <char> <int> <char> <char> <char> <int>
# 1: 1 517 515 2 830 819 11 UA 1545 N14228 EWR IAH 227
# 2: 1 542 540 2 923 850 33 AA 1141 N619AA JFK MIA 160
# 3: 1 544 545 -1 1004 1022 -18 B6 725 N804JB JFK BQN 183
# 4: 1 557 600 -3 838 846 -8 B6 79 N593JB JFK MCO 140
# 5: 1 558 600 -2 849 851 -2 B6 49 N793JB JFK PBI 149
# 6: 1 559 600 -1 854 902 -8 UA 1187 N76515 EWR LAS 337
# 7: 1 600 600 0 851 858 -7 B6 371 N595JB LGA FLL 152
# 8: 1 615 615 0 1039 1100 -21 B6 709 N794JB JFK SJU 182
# 9: 1 635 635 0 1028 940 48 AA 711 N3GKAA LGA DFW 248
# 10: 1 655 655 0 1021 1030 -9 DL 1415 N3763D JFK SLC 294
# ---
# 8823: 10 2038 2045 -7 2140 2154 -14 B6 1178 N640JB EWR BOS 40
# 8824: 10 2040 2040 0 2351 2357 -6 B6 677 N809JB JFK LAX 343
# 8825: 10 2054 2100 -6 2202 2207 -5 US 2144 N952UW LGA BOS 34
# 8826: 10 2058 2100 -2 2229 2225 4 WN 530 N443WN LGA MDW 117
# 8827: 10 2104 2110 -6 2337 2355 -18 B6 529 N507JB EWR MCO 127
# 8828: 10 2129 2130 -1 148 218 -30 B6 701 N193JB JFK SJU 186
# 8829: 10 2159 2159 0 2247 2300 -13 EV 4519 N13124 EWR BWI 33
# 8830: 10 2320 2250 30 16 2354 22 B6 1018 N612JB JFK BOS 35
# 8831: 10 NA 635 NA NA 940 NA AA 711 N3CDAA LGA DFW NA
# 8832: 10 NA 700 NA NA 1007 NA UA 719 EWR DFW NA
# 6 variables not shown: [distance <int>, hour <int>, minute <int>, time_hour <POSc>, year <int>, month <int>]
Notes:
year=?/month=?
subdirectory and combine all of the data into one file. Remember my mention of "many files, slow open_dataset
"? Combining sibling .parquet
files might help solve this. Again, this is only possible if you can read all of one subdir set into memory at a time. It might not be necessary, over to you.year=
implicitly creates the indexable column. (If you read one of the 1.parquet
directory, you'll note that it does not have year
or month
in them.) If this is not desired, you can create plainer path names (e.g., 2013/12/1.parquet
) and use open_dataset("datamart2", partitions=c("year","month"))
.