Search code examples
rdata.tablesubsetparallel-foreach

Export named variable in foreach loop


I have a large data.table (+12M rows) that I need to convert this way :
Collapse each row with same first column value (let's call it BookId) into 1 row, and merge other columns into a big "data" field. This table contains 2.7M unique BookId's

Ie :

BookId    Col1      Col2     ...      ColN
B001      Author    Bob      ...      ...
B002      Author    Marc     ...      ...
B002      Editor    Bob Inc  ...      ...
B001      Editor    MyBooks  ...      ...

Expeted result :

BookId    data
B001      Bob,MyBooks, ...
B002      Marc,Bob Inc, ...

For now, I've been able to reproduce this structure using subsets, but this is really slow, it takes up to 300ms to build a row, which means it will take up to 9 days to achieve the process.

So I decided to use a parallel foreach loop to speed up the process.
My first approch was to loop over the bookId List, but it would only devide global overall time by the number of cores which is not satisfying (8 cores means +1 day). Also, that means each process will auto-export a very large amount of data as they all need the whole data.table object.

I found another approach to improve the process by splitting the primary data.table into independant subsets based on bookId list, then make each cluster works on that subsets (less rows means faster subsets generation). Unfortunatly, I'm unable to export my subsets to clusters as they have a "dynamic" name. I tried ".export" param but I guess it isn't aware of the current "i" value when evaluated. How can I achieve this ? Is it even possible ?

I'm new to R, I have been told that there was always many ways to achieve the same thing, did I choose the best approach to achieve this ?

Here is my code :

# Create cluster based on available cores
cores = detectCores()
cl <- makeCluster(cores)
registerDoParallel(cl)

# Load datas and generate BookId lists
books <- fread("books.tab")
bookId.unique.list <- unique(books$BookId)
bookId.list <- books$BookId

# Split datatable into "equals" subsets
subset.length = ceiling(length(book.unique.list)/cores)
for (i in 1:(cores)) {
    start = (i-1)*subset.length
    end = (i)*subset.length
    list = book.unique.list[start:end]
    assign(paste("books",i,sep=""), books[books$BookId %in% list])
    assign(paste("book.list",i,sep=""), list )
}

# Prepare resulting DT
res = data.table(BookId = character(0), data = character(0))

# Parallel loop
res  <- foreach(i = 1:cores, .combine = rbind, .export = paste0("book", i),  .packages = c("data.table")) %dopar% {

    #Try to get the named subset corresponding to the current iteration (i)
    # IE : Books1, Books2...
    BookSubset = get(paste0("book", i))
    Book.list.subset = unique(BookSubset$BookId)

    temp = data.table(BookId = character(0), data = character(0))

    for (i in 1:length(Book.list.subset)) {
        bookId = Book.list.subset[i]

        subset <- BookSubset[which(Book.list.subset ==bookId)]
        output = capture.output(write.table(subset, stdout()quote=FALSE, row.names=FALSE,col.names=FALSE)

      temp <- rbind(hist, data.table(zkf_BOOK = c(bookId), data = c(output)))
    }
    temp
}

Here is the result of dput[head(books)):

structure(list(BookId = c("BOOKXXXX774051532082", "BOOKXXXX776514515608", 
    "BOOKXXXX776287821289", "BOOKXXXX776514515608", "BOOKXXXX774051532082", 
    "BOOKXXXX774051532082"), V2 = c("ZUSRXXXX842901236553", 
    "ZUSRXXXX371255229634", 
     "ZUSRXXXX656080986411", "ZUSRXXXX371255229634", "ZUSRXXXX842901236553", 
    "ZUSRXXXX842901236553"), V3 = c("BOOKEVTX776757835463", 
    "BOOKEVTX776762775464", 
    "BOOKEVTX776772854465", "BOOKEVTX776773643466", "", "BOOKEVTX776995487467"
    ), V4 = c("ZACTIONX215229995154", "ZACTIONX533300043134", 
    "ZACTIONX533300043134", 
    "ZACTIONX533300043134", "", "ZACTIONX215229995154"), V5 = c("", 
    "", "", "", "", ""), V6 = c("", "", "", "", "MAILOUTX776774376684", 
    ""), V7 = c("", "", "", "", "", ""), V8 = c("", "", "", "", "", 
    ""), V9 = c("", "", "", "", "", ""), V10 = c("", "", "", "", 
    "", ""), V11 = c("", "", "", "", "", "")), .Names = c("zkf_BOOK", 
    "V2", "V3", "V4", "V5", "V6", "V7", "V8", "V9", "V10", "V11"), class = 
    c("data.table", 
    "data.frame"), row.names = c(NA, -6L))

Here is a sample of my "real" data input:

BOOKXXXX774051532082    ZUSRXXXX842901236553    BOOKEVTX776757835463    ZACTIONX215229995154                            
BOOKXXXX776514515608    ZUSRXXXX371255229634    BOOKEVTX776762775464    ZACTIONX533300043134                            
BOOKXXXX776287821289    ZUSRXXXX656080986411    BOOKEVTX776772854465    ZACTIONX533300043134                            
BOOKXXXX776514515608    ZUSRXXXX371255229634    BOOKEVTX776773643466    ZACTIONX533300043134                            
BOOKXXXX774051532082    ZUSRXXXX842901236553                MAILOUTX776774376684                    
BOOKXXXX774051532082    ZUSRXXXX842901236553    BOOKEVTX776995487467    ZACTIONX215229995154                            
BOOKXXXX776287821289    ZUSRXXXX656080986411    BOOKEVTX777107387468    ZACTIONX533300043134    

and the expected output

BOOKXXXX774051532082    ZUSRXXXX842901236553|BOOKEVTX776757835463|ZACTIONX215229995154|||||||;ZUSRXXXX842901236553||||MAILOUTX776774376684|||||;ZUSRXXXX842901236553|BOOKEVTX776995487467|ZACTIONX215229995154|||||||
BOOKXXXX776514515608    ZUSRXXXX371255229634|BOOKEVTX776762775464|ZACTIONX533300043134|||||||;ZUSRXXXX371255229634|BOOKEVTX776773643466|ZACTIONX533300043134|||||||
BOOKXXXX776287821289    ZUSRXXXX656080986411|BOOKEVTX776772854465|ZACTIONX533300043134|||||||;ZUSRXXXX656080986411|BOOKEVTX777107387468|ZACTIONX533300043134|||||||

Solution

  • The OP has requested two collapse operations:

    1. For each row, collapse all columns (except the id column zkf_BOOK) into one data field separated by |.
    2. For each zkf_BOOK group, collapse the rows separated by ;

    Collapsing within the columns is done by a call to Reduce() while collapsing across rows is done groupwise using paste(). With data.table, the columns in the by = parameter are not included in operations on .SD.

    library(data.table)
    setDT(books)[, paste(Reduce(function(x, y) paste(x, y, sep = "|"), .SD), collapse = ";"), 
                 by = zkf_BOOK]
    
                   zkf_BOOK
    1: BOOKXXXX774051532082
    2: BOOKXXXX776514515608
    3: BOOKXXXX776287821289
                                                                                                                                                                                                  V1
    1: ZUSRXXXX842901236553|BOOKEVTX776757835463|ZACTIONX215229995154|||||||;ZUSRXXXX842901236553||||MAILOUTX776774376684|||||;ZUSRXXXX842901236553|BOOKEVTX776995487467|ZACTIONX215229995154|||||||
    2:                                                   ZUSRXXXX371255229634|BOOKEVTX776762775464|ZACTIONX533300043134|||||||;ZUSRXXXX371255229634|BOOKEVTX776773643466|ZACTIONX533300043134|||||||
    3:                                                                                                                         ZUSRXXXX656080986411|BOOKEVTX776772854465|ZACTIONX533300043134|||||||
    

    Note that the discrepancy to the expected result is due to dput[head(books)) returning only 6 rows while the printed data input and expected output are based on 7 rows (or more).