Search code examples
rdatabasearrow-functionsduckdbr-future

Working with a folder of large .txt files using Arrow, Duckdb, and Future


I am merging a folder 250 of large .txt files each approximately 1GB and over 6 million rows. I'm merging this folder with another .txt file outside of this folder that is also 1GB based on a common column. I'm running to memory issues when attempting to merge the two even when I drop as many columns as possible.

The following are 10 rows and a few columns of three of the .txt files in the folder using dput()

profile_1<- structure(list(user_id = c(1e+06, 1000001, 1000002, 1000003, 
1000004, 1000006, 1000007, 1000008, 1000009, 1000010), fullname = c("Morgan Fabian", 
"Cally Dupe", "Horacio Bernal Gonzalez", "Deepika Babar", "Nathan Molle-Clark", 
"Imran Yasin", "Naveen Reddy", "Robert Coffman", "Master Tasco", 
"Brian Weatherford"), sex= c("F", "F", "M", "F", "M", 
"M", "M", "M", "e", "M"), country = c("Kannapolis, North Carolina, United States", 
"Greater Perth Area", "Mexico", "Mumbai, Maharashtra, India", 
"Columbia, Missouri, United States", "United Kingdom", "Greater Hyderabad Area", 
"Huntington Beach, California, United States", "Norway", "Greater Chicago Area"
)), row.names = c(NA, -10L), class = c("tbl_df", "tbl", "data.frame"
))
.
.
.
profile_200<-structure(list(user_id = c(908150405, 908150406, 908150407, 908150408, 
908150409, 908150410, 908150415, 908150416, 908150417, 908150418
), fullname = c("aatm prakash chaturvedi", "ab musa", "Aabida Sayed", 
"aanand paranjape.1", "abd aziz muslim", "abdalrhman mohammad", 
"Abdul Mannan Mannan", "Aashish Punmiya", "Abdelkader Elmouetamid55", 
"abd kamal sayuti mat ali"), sex = c("e", "e", "e", 
"M", "M", "M", "M", "M", "e", "M"), country = c("India", "Ethiopia", 
"India", "India", NA, NA, "Bangladesh", "India", "France", NA
)), row.names = c(NA, -10L), class = c("tbl_df", "tbl", "data.frame"
))
.
.
.
profile_250<- structure(list(user_id = c(2039727045, 2039727046, 2039727047, 
2039727049, 2039727050, 2039727051, 2039727052, 2039727053, 2039727054, 
2039727055), fullname = c("HOD(MECH) Trident Academy of Technololgy", 
"Hansi Figlmueller", "Mark Tan", "bianka soares", "Yuan Hu", 
"Luis Miguel Navia sanchez", "Andreia Cezar", "Luciana Roth", 
"Michał Zagajewski", "Cindy Wojdyla Cain"), sex = c("F", 
"e", "M", "F", "M", "M", "F", "F", "e", "F"), country = c("India", 
"Austria", "Canada", "Brazil", "United States", "United States", 
"Brazil", "Argentina", "Poland", "United States")), row.names = c(NA, 
-10L), class = c("tbl_df", "tbl", "data.frame"))

A few rows of the other_dataset that I'm merging the folder with using dput():

other_dataset<- structure(list(id = c(143, 143, 143, 379, 379, 379, 379, 379, 
419, 419), user_id = c(1309564, 2601833, 4168473, 1417698, 2991030, 
4654601, 4685011, 4748461, 1050940, 1056187), other_id = c(18564, 
18564, 18564, 26847, 26847, 26847, 26847, 26847, 30138, 30138
)), row.names = c(NA, -10L), class = c("tbl_df", "tbl", "data.frame"
))

This post provides information on how to work with a large CSV file using Arrow and duckdb packages. I'm wondering if there is a way to apply it to a folder. I also have heard of future package (documentation here) that helps with parallel processing of large datasets but I am not quite sure how to integrate it into my code.

I've tried the following (Using the information in the above post.)

path_to_folder <- "/Users/myname/folder/"

my_files <- arrow::open_dataset(source = path_to_folder, pattern = "*.txt") %>%
  arrow::to_duckdb()

However, there is the following error message:

Error in ParquetFragmentScanOptions$create(...) : 
  unused argument (pattern = "*.txt")

My eventual is goal is to use the list of files in the following method for an inner_join thanks to the code provided in this post.

# function to read file and just keep certain columns
read_and_pare <- \(x) {
  x |> 
    read_delim(col_select = c(user_id, fullname)) |>
    filter(!is.na(user_id)) |>
    distinct()
}


# read data files into list, stack into one table, join to other table
purrr::map(my_files, read_and_pare) |>
  setNames(my_files) |>
  bind_rows(.id = "user_file") |>
  inner_join(other_dataset,
             join_by(user_id == user_id))

Is there a way to make use duckdb and arrow and future in my code to achieve my goal?

Update1

After the suggestion by @I_O I edited code to the following and no longer had an error message for this part:

path_to_folder <- "/Users/myname/folder/"

my_files <- arrow::open_dataset(source = path_to_folder, format= "text", delimiter= "\t") %>%
  arrow::to_duckdb()

However, when I proceeded with my merge (code provided above) the following error message appeared

Error in `purrr::map()`:
ℹ In index: 1.
ℹ With name: src.
Caused by error in `enc2utf8()`:
! argument is not a character vector
Run `rlang::last_trace()` to see where the error occurred.

Update 2

The following is head -n 10 of the datasets in the terminal session :

First one of the files in the folder:

user_id fullname    f_prob  m_prob  white_prob  black_prob  api_prohispanic_prob    native_prob multiple_prob   highest_degree  sex_predicted   ethnicity_predicted linkedin_url    user_location   country updated_dt  numconnections
1000000.0   Morgan Fabian   0.835   0.165   0.963   0.008   0.002   0.026   0.001   0.0 Associate   F   White   linkedin.com/in/morgan-fabian-7466664a  Kannapolis, North Carolina, United States   United States   2023-07-17  12.0
1000001.0   Cally Dupe  1.0 0.0 0.584   0.002   0.08    0.001   0.002   0.331   Bachelor    F   White   linkedin.com/in/cally-dupe-b24b1337 Greater Perth Area  Australia   2023-07-03  500.0
1000002.0   Horacio Bernal Gonzalez 0.0 1.0 0.04    0.009   0.008   0.937   0.003   0.003       M   Hispanic    linkedin.com/in/horacio-bernal-gonzalez-849481145   Mexico  Mexico  2023-06-08  4.0
1000003.0   Deepika Babar   1.0 0.0 0.053   0.016   0.868   0.014   0.013   0.036       F   API linkedin.com/in/deepika-babar-3a653065  Mumbai, Maharashtra, India  India   2023-06-20  7.0
1000004.0   Nathan Molle-Clark  0.003   0.997   0.957   0.021   0.001   0.0 0.002   0.019   Bachelor    M   White   linkedin.com/in/nathanmolle-clark   Columbia, Missouri, United States   United States   2023-07-30  168.0
1000006.0   Imran Yasin 0.0 1.0 0.195   0.128   0.677   0.0 0.0 0.0     M   API linkedin.com/in/imran-yasin-b056366b    United Kingdom  United Kingdom  2023-06-24  65.0
1000007.0   Naveen Reddy    0.225   0.775   0.039   0.0 0.959   0.002   0.0 0.0     M   API linkedin.com/in/shivamogga  Greater Hyderabad Area  India   2023-06-21  238.0
1000008.0   Robert Coffman  0.004   0.996   0.984   0.0 0.0 0.002   0.001   0.013       M   White   linkedin.com/in/robert-coffman-854a191aHuntington Beach, California, United States  United States   2023-07-17  4.0
1000009.0   Master Tasco    0.5 0.5 0.279   0.625   0.009   0.013   0.008   0.066       e   Black   linkedin.com/in/master-tasco-bbb85897   Norway  Norway  2023-07-20  1.0

Second, the other_dataset

"id"    "user_id"   "gvkey"
143 1309564 18564
143 2601833 18564
143 4168473 18564
379 1417698 26847
379 2991030 26847
379 4654601 26847
379 4685011 26847
379 4748461 26847
419 1050940 30138

Solution

  • Sample data

    Let us first create some sample data

    setwd(dirname(rstudioapi::getSourceEditorContext()$path)) # set the current script's location as working directory
    library(dplyr)
    library(data.table)
    library(duckdb)
    library(arrow)
    
    
    # Function to generate random IDs
    generate_ids <- function(n) {
      sample(100000:999999, n, replace = TRUE)
    }
    
    # Function to generate random countries
    generate_countries <- function(n) {
      countries <- c("United States", "India", "China", "Brazil", "Russia", 
                     "United Kingdom", "France", "Germany", "Japan", "Canada")
      sample(countries, n, replace = TRUE)
    }
    
    # Create the other_dataset first (500,000 rows)
    set.seed(123)
    other_dataset <- data.frame(
      id = generate_ids(500000),
      user_id = generate_ids(500000),
      other_id = generate_ids(500000),
      fullname = generate_countries(500000)
    )
    
    # Optionally write other_dataset
    #write.csv(other_dataset, "other_dataset.txt",row.names = FALSE, quote = FALSE)
    
    
    # Create 3 user files with 1 million rows each for testing
    for(i in 1:3) {
      # Generate base data
      n_rows <- 1000000
      
      # Ensure some overlap with other_dataset
      overlap_ids <- sample(other_dataset$user_id, n_rows * 0.3)  # 30% overlap
      random_ids <- generate_ids(n_rows * 0.7)
      
      user_data <- data.frame(
        ID2 = generate_ids(n_rows),
        fullname = generate_countries(n_rows),
        user_id = c(overlap_ids, random_ids)[1:n_rows],  # Combine and take required number
        extra_col = rnorm(n_rows)
      )
      
      # Randomize the order
      user_data <- user_data[sample(1:n_rows), ]
      
      # Write to file
      filename <- paste0("texts/user_", i, ".txt")
      write.table(user_data, filename, row.names = FALSE, quote = FALSE, sep = "\t")
      
      # Print progress
      cat("Created:", filename, "\n")
      
      # Clean up
      rm(user_data)
      gc()
    }
    

    How to use arrow and duckdb

    # load same libs as before
    
    # Create connection
    con <- dbConnect(duckdb())
    
    # Register the folder of text files
    text_files <- arrow::open_dataset(
      source = "texts/", # this is the path to your folder with the text documents!
      format = "text",
      delimiter = "\t"
    ) %>% 
      to_duckdb(con, "profiles")
    
    
    # Register other dataset 
    # assuming that "other_dataset" is loaded into the workspace
    duckdb::duckdb_register(con, "other_data", other_dataset)
    
    # Perform the merge in DuckDB
    result <- dbGetQuery(con, "
      SELECT p.user_id, p.fullname, o.id, o.other_id
      FROM profiles p
      INNER JOIN other_data o ON p.user_id = o.user_id
      WHERE p.user_id IS NOT NULL
    ")
    
    # Clean up
    dbDisconnect(con, shutdown = TRUE)