Search code examples
rhpcffffbase

how to speed up checking duplication for huge ffdf


I have a list of ffdf, it takes up about 76GB of RAM if it is loaded to RAM instead of using ff package. The following is their respective dim()

> ffdfs |> sapply(dim)
         [,1]     [,2]     [,3]      [,4]      [,5]      [,6]      [,7]
[1,] 11478746 12854627 10398332 404567958 490530023 540375993 913792256
[2,]        3        3        3         3         3         3         3
         [,8]     [,9]     [,10]     [,11]    [,12]     [,13]     [,14]
[1,] 15296863 11588739 547337574 306972654 11544523 255644408 556900805
[2,]        3        3         3         3        3         3         3
        [,15]     [,16]    [,17]
[1,] 13409223 900436690 15184264
[2,]        3         3        3

I want to check the number of duplication in each ffdf, so I did the following:

check_duplication <- sample_cols |> sapply(function(df) {
    df[c("chr","pos")] |> duplicated() |> sum()
})

It works but it is extremely slow.

I am on a HPC, I have about 110GB RAM and 18CPU.

Will there be any other option or setting I could adjust to speed up the process? Thank you.


Solution

  • Parallelization is a natural way to speed this up. It can be done at C level via data.table:

    library("data.table")
    
    data.table 1.14.2 using 4 threads (see ?getDTthreads).  Latest news: r-datatable.com
    
    set.seed(1L)
    x <- as.data.frame(replicate(2L, sample.int(100L, size = 1e+06L, replace = TRUE), simplify = FALSE))
    y <- as.data.table(x)
    microbenchmark::microbenchmark(duplicated(x), duplicated(y), times = 1000L)
    
    Unit: milliseconds
              expr       min         lq       mean     median         uq       max neval
     duplicated(x) 449.27693 596.242890 622.160423 625.610267 644.682319 734.39741  1000
     duplicated(y)   5.75722   6.347518   7.413925   6.874593   7.407695  58.12131  1000
    

    The benchmark here shows that duplicated is much faster when applied to a data.table instead of an equivalent data frame. Of course, how much faster depends on the number of CPUs that you make available to data.table (see ?setDTthreads).

    If you go the data.table route, then you would process your 17 data frames like so:

    nduped <- function(ffd) {
      x <- as.data.frame(ffd[c("chr", "pos")])
      setDT(x)
      n <- sum(duplicated(x))
      rm(x)
      gc(FALSE)
      n
    }
    vapply(list_of_ffd, nduped, 0L)
    

    Here, we are using setDT rather than as.data.table to perform an in-place coercion from data frame to data.table, and we are using rm and gc to free the memory occupied by x before reading another data frame into memory.

    If, for whatever reason, data.table is not an option, then you can stick to using the duplicated method for data frames, namely duplicated.data.frame. It is not parallelized at C level, so you would need to parallelize at R level, using, e.g., mclapply to assign your 17 data frames to batches and process those batches in parallel:

    nduped <- function(ffd) {
      x <- as.data.frame(ffd[c("chr", "pos")])
      n <- sum(duplicated(x))
      rm(x)
      gc(FALSE)
      n
    }
    unlist(parallel::mclapply(list_of_ffd, nduped, ...))
    

    This option is slower and consumes more memory than you might expect. Fortunately, there is room for optimization. The rest of this answer highlights some of the main issues and ways to get around them. Feel free to stop reading if you've already settled on data.table.

    • Since you have 18 CPUs, you can try to process all 17 data frames simultaneously, but you might encounter out-of-memory issues as a result of reading all 17 data frames into memory at once. Increasing the batch size (i.e., distributing the 17 jobs across fewer than 17 CPUs) should help.

    • Since your 17 data frames vary widely in length (number of rows), randomly assigning them to roughly equally sized batches is probably not a good strategy. You could decrease the overall run time by batching shorter data frames together and not batching longer data frames together. mclapply has an affinity.list argument giving you this control. Ideally, each batch should require the same amount of processing time.

    • The amount of memory that each job uses is actually at least two times greater than the amount needed to store the data frame x, because duplicated.data.frame copies its argument:

      x <- data.frame(chr = rep(1:2, times = 5L), pos = rep(1:2, each = 5L))
      tracemem(x)
      
      [1] "<0x14babad48>"
      
      invisible(duplicated(x))
      
      tracemem[0x14babad48 -> 0x14babc088]: as.list.data.frame as.list vapply duplicated.data.frame duplicated
      

      The copy happens inside of the vapply call in the body of the method:

      duplicated.data.frame
      
      function (x, incomparables = FALSE, fromLast = FALSE, ...) 
      {
          if (!isFALSE(incomparables)) 
              .NotYetUsed("incomparables != FALSE")
          if (length(x) != 1L) {
              if (any(i <- vapply(x, is.factor, NA))) 
                  x[i] <- lapply(x[i], as.numeric)
              duplicated(do.call(Map, `names<-`(c(list, x), NULL)), 
                  fromLast = fromLast)
          }
          else duplicated(x[[1L]], fromLast = fromLast, ...)
      }
      <bytecode: 0x15b44f0f0>
      <environment: namespace:base>
      

      That vapply call is completely avoidable: you should already know whether chr and pos are factors. I would suggest defining a replacement for duplicated.data.frame that does only what is necessary given your use case. For example, if you know that chr and pos are not factors, then you might assign

      duped <- function(x) {
        duplicated.default(do.call(Map, `names<-`(c(list, x), NULL)))
      }
      

      and compute sum(duped(x)) instead of sum(duplicated(x)). In fact, you could do slightly better by replacing list with c:

      fastduped <- function(x) {
        duplicated.default(do.call(Map, `names<-`(c(c, x), NULL)))
      }
      

      Using c here causes rows of the data frame x to be stored and compared as atomic vectors rather than as lists. In other words, fastduped(x) is doing

      duplicated.default(<length-'m' list of length-'n' atomic vectors>)
      

      whereas duped(x) is doing

      duplicated.default(<length-'m' list of length-'n' lists of length-1 atomic vectors>)
      

      where m = nrow(x) and n = length(x). The latter is slower and consumes more memory, and there is a warning in ?duplicated saying as much:

      Using this for lists is potentially slow, especially if the elements are not atomic vectors (see ‘vector’) or differ only in their attributes. In the worst case it is O(n^2).

      Computing sum(fastduped(x)) instead of sum(duplicated(x)) should increase the number of data frames that you can process simultaneously without running out of memory. FWIW, here is a benchmark comparing the run times of duplicated, duped, fastduped (saying nothing about memory usage):

      set.seed(1L)
      x <- as.data.frame(replicate(2L, sample.int(100L, size = 1e+06L, replace = TRUE), simplify = FALSE))
      microbenchmark::microbenchmark(duplicated(x), duped(x), fastduped(x), times = 1000L)
      
      Unit: milliseconds
                expr      min       lq     mean   median       uq      max neval
      duplicated(x) 521.7263 598.9353 688.7286 628.8813 769.6100 1324.458  1000
            duped(x) 521.3863 598.7390 682.1298 627.1445 764.7331 1373.712  1000
        fastduped(x) 431.0359 528.6613 594.1534 553.7739 609.6241 1123.542  1000