Search code examples
rparquetapache-arrow

Give arrow package's write_parquet does not support append is there any alternative?


I have to generate a parquet file by writing parts of the data at a time give memory constraints. The arrow package's write_parquet does not support this functionality. I understand that I can write to multiple parquet files and re-read them as a "dataset". Wondering if there are alternative packages / solutions to generate a single parquet file. Downstream users demand this. Python supports the append, but it messes up substantially with the date fields and floating point columns, and is not an option.


Solution

  • Writing out the parquet file in chunks does not require that the data all be in RAM at once.

    To demonstrate this first define gen_data(i, size) which creates a data frame with size rows consecutively starting from i in order to simulate the process discussed in the question.

    Now using the peakRAM package we check how much RAM is required to create a data frame with 1e7 rows (76.3/max 152.6 MiB). Then generate one tenth of the data at a time and write it out to parquet as a chunk in which case we only use 0.0/max 26.7 MiB. This shows that we did not need to have the entire data frame in RAM at once.

    Below we have modified the source of write_parquet in the arrow package to write out the data in chunks.

    library(arrow)
    library(peakRAM)
    
    invisible(gc())
    
    gen_data <- function(i, size) {
      data.frame(x = as.numeric(seq(i, length = size)))
    }
    
    f <- function() {
      # define inputs
      path <- "test.parquet"
    
      # open
      schema <- arrow_table(gen_data(1, 1))$schema # gen 1 row to get schema
      sink <- FileOutputStream$create(path) 
      writer <- ParquetFileWriter$create(schema = schema, sink = sink,
        properties = ParquetWriterProperties$create(names(schema)))
    
      # writes
      for(i in seq(1, 1e7, 1e6)) { 
        dat <- gen_data(i, 1e6)
        writer$WriteTable(arrow_table(dat), chunk_size = 1e6)
        gc()
      }
    
      # close
      writer$Close()
      sink$close()
    
    }
    

    Now test it out:

    if (file.exists("test.parquet")) unlink("test.parquet")
    invisible(gc())
    peakRAM(gen_data(1, 1e7), f())
    ##       Function_Call Elapsed_Time_sec Total_RAM_Used_MiB Peak_RAM_Used_MiB
    ## 1 gen_data(1,1e+07)             0.37               76.3             152.6
    ## 2               f()             1.78                0.0              26.7