Search code examples
rcsvparquetpartitioningapache-arrow

Can I stream data into a partitioned parquet (arrow) dataset from a database or another file?


I work with tables that are tens or hundreds of gigabytes in size. They are in a postgres database. I have also dumped them to CSV files.

I would like to build a partitioned parquet dataset. I wrote a script that does exactly what I want for a small subset of the data. I don't want to do the full dataset that way. 1) Will require too much memory. 2) Will be horribly slow.

I searched the dbplyr, arrow, and duckdb docs for a way to write a lazy table directly to disk without reading it into memory. Boy would that be nice. I couldn't find one.

Maybe I could collect the data in chunks, say years, but then how do I write them to the same dataset?


Solution

  • Because arrow::write_dataset can do hive-style partitioning (by sub-directory) natively based on fields in the data, I suggest you write "as many parquet files" as you need to get the job done, and then optionally combine them.

    Note: the question of "how many keys" do we define, note that a lot of files with small numbers of rows will be inefficient. See https://arrow.apache.org/docs/r/articles/dataset.html#partitioning-performance-considerations for discussions about partition/size performance, where currently the guidelines say:

    • Avoid files smaller than 20MB and larger than 2GB.
    • Avoid partitioning layouts with more than 10,000 distinct partitions.

    That is, if you have two keys in your data (say key1 and key2), then when you use arrow::write_dataset, it inserts those into the subdir paths as appropriate. Because the default behavior of write_dataset is to overwrite or error when existing files are found, we use an external counter and update their basename_template so that each consecutive write augments instead of replaces.

    Here's my proposed path:

    • Pull your data in batches, as many as you think you can do in memory without crashing, but give yourself some buffer. Let's say you pull 10Mi rows in each batch. (This is either from the database or from CSV files. This size is highly dependent on how many columns and their classes, where strings take up a lot more memory.)
    • Increment counter for the template, using this in a call to arrow::write_dataset.
    • You have the option to pre-delete your large object variable and force garbage-collection; I haven't benchmarked to see if this is truly critical, but rm(dat); gc() might be useful.
    • Repeat.

    Here is faking two such steps:

    counter <- 1L
    dat <- data.frame(key1 = c("A","A","B"), key2=c("E","F","F"), val=1:3)
    arrow::write_dataset(dat, "somedir", partitioning = c("key1", "key2"),
                         basename_template = sprintf("part-%s-{i}.pq", counter))
    list.files("somedir", recursive = TRUE, full.names = TRUE)
    # [1] "somedir/key1=A/key2=E/part-1-0.pq"
    # [2] "somedir/key1=A/key2=F/part-1-0.pq"
    # [3] "somedir/key1=B/key2=F/part-1-0.pq"
    
    counter <- counter + 1L
    dat <- data.frame(key1 = c("A","C","D"), key2=c("E","F","G"), val=2:4)
    arrow::write_dataset(dat, "somedir", partitioning = c("key1", "key2"),
                         basename_template = sprintf("part-%s-{i}.pq", counter))
    list.files("somedir", recursive = TRUE, full.names = TRUE)
    # [1] "somedir/key1=A/key2=E/part-1-0.pq"
    # [2] "somedir/key1=A/key2=E/part-2-0.pq"
    # [3] "somedir/key1=A/key2=F/part-1-0.pq"
    # [4] "somedir/key1=B/key2=F/part-1-0.pq"
    # [5] "somedir/key1=C/key2=F/part-2-0.pq"
    # [6] "somedir/key1=D/key2=G/part-2-0.pq"
    

    The "A"/"E" first row is repeated in the second batch, so we see that key1=A/key2=E now has two files in it. Just for proof, we can now read that directory to see that data from each of the two batches is available in one dataset connection:

    library(dplyr)
    arrow::open_dataset("somedir/key1=A/key2=E/") %>%
      collect()
    #   val
    # 1   1
    # 2   2
    

    (noting that key1 and key2 are implied by the path).

    Or we can read the whole thing (assuming all frames have identical schema), filter on just the keys we need, and then collect() (which is where the data is pulled into memory:

    arrow::open_dataset("somedir/") %>%
      filter(key1 == "A", key2 == "E") %>%
      collect()
    #   val key1 key2
    # 1   1    A    E
    # 2   2    A    E
    

    (The keys are retained since we opened higher in the subdirectory hierarchy.)

    If at some point you find you have many many files in any one of these subdirectories, then I suggest you can combine them manually. Depending on the number of rows in each of the parquet files, you may not be able to load all files in one subdir (e.g., somedir/key1=A/key2=E/) into memory at a time, but again you can likely batch a number of files, load them into memory, save as a single parquet file, then move the others out of the way.