Search code examples
rforeachparallel-processingsnow

Read many files in parallel and extract data


I have 1000 json files. And I would like to read them in parallel. I have 4 CPU cores.

I have a character vector which has the names of all the files as following:-

cik_files <- list.files("./data/", pattern = ".json")

And using this vector I load the file and extract the data and add it to the following list:-

data <- list()

Below is the code for extracting the data:-

for(i in 1:1000){
  data1 <- fromJSON(paste0("./data/", cik_files[i]), flatten = TRUE)
  if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
    data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
    data1 <- data1[grep("CY20[0-9]{2}$", data1$frame), c(3, 9)]
    try({if(nrow(data1) > 0){
      data1$cik <- strtrim(cik_files[i], 13)
      data[[length(data) + 1]] <- data1
    }}, silent = TRUE)
  }
}

This however, takes quite a lot of time. So I was wondering how I can run the code within the for loop but in parallel.

Thanks in advance.


Solution

  • Here is an attempt to solve the problem in the question. Untested, since there is no data.

    Step 1

    First of all, rewrite the loop in the question as a function.

    f <- function(i, path = "./data", cik_files){
      filename <- file.path(path, cik_files[i])
      data1 <- fromJSON(filename, flatten = TRUE)
      if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
        data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
        found <- grep("CY20[0-9]{2}$", data1$frame)
        if(length(found) > 0){
          tryCatch({
            out <- data1[found, c(3, 9)]
            out$cik <- strtrim(cik_files[i], 13)
            out
          },
          error = function(e) e,
          warning = function(w) w)
        } else NULL
      } else NULL
    }
    

    Step 2

    Now load the package parallel and run one of the following, depending on OS.

    library(parallel)
    

    # Not on Windows
    library(jsonlite)
    json_list <- mclapply(seq_along(cik_files), f, cik_files = cik_files)
    

    # Windows
    ncores <- detectCores()
    cl <- makeCluster(ncores - 1L)
    clusterExport(cl, "cik_files")
    clusterEvalQ(cl, "cik_files")
    clusterEvalQ(cl, library(jsonlite))
    
    json_list <- parLapply(cl, seq_along(cik_files), f, cik_files = cik_files)
    
    stopCluster(cl)
    

    Step 3

    Extract the data from the returned list json_list.

    err <- sapply(json_list, inherits, "error")
    warn <- sapply(json_list, inherits, "warning")
    ok <- !(err | warn)
    json_list[ok]  # correctly read in