Search code examples
rworkflowtargettargets-r-package

Using R Targets to Append New Data to Exisiting Data


I'm using a targets workflow pipeline. Part of this pipeline is to monitor a directory of csv files for updates. There are more than 10,000 csv files in this directory, and new files are added weekly. I want to be able to identify the newly added files and append them to an existing set of *.rds files. The easy thing would be to re-run the process that creates the 5 subsets of *.rds files each week, but that takes time. The efficient thing would be to identify the newly added files, and simply bind_rows with the proper rds file.

I can do this easily enough with typical programming using dir() and setdiff(), where I store a snapshot of csv filepaths from the previous day. But I'm struggling to accomplish this within the targets framework.

Here is an attempt that doesn't seem to work. I think I want to monitor the temporary results in the /_targets directory, but I'm not sure how to go about doing that. And, the targets documentation recommended not using tar_load inside the target configuration itself.

tar_script({
   list(
      tar_target(csv_directory, "/csv/"),
      tar_target(csv_snapshot, dir(csv_directory)),
      tar_target(append_action, if(length(setdiff(dir(csv_directory), dir(csv_snapshot))) > 0){
                                ...}
})


Solution

  • A few components that may help:

    1. File targets: https://books.ropensci.org/targets/files.html. With tar_target(format = "file"), the package watches input and/or output files for changes and reruns the affected targets if there are any.
    2. Alternative storage formats: https://docs.ropensci.org/targets/reference/tar_target.html#storage-formats. Instead of aggregating your CSV files into external RDS files, it could be more efficient to use something like tar_target(format = "feather") so targets automatically compresses your output data and ensures you do not have to worry about micromanaging files.
    3. Dynamic branching: books.ropensci.org/targets/dynamic.html. Dynamic branching is a way to define large collections of new targets while the pipeline is running. This lets you, for example, create a new target for a file or batch of existing files.
    4. Batching: https://books.ropensci.org/targets/dynamic.html#batching. 10000 targets is a lot, and the targets package can slow down with that many because there is an overhead cost for each target.

    So I recommend that you organize your CSV files into batches (say, by week) and dynamically branch over batches to process them. Another batch structure may be more appropriate, depending on the specifics of your use case.

    csv/
    ├── week1/
    │   ├── data1.csv
    │   ├── data2.csv
    │   ├── ...
    ├── week2/
    │   ├── data1.csv
    │   ├── data2.csv
    │   ├── ...
    ...
    

    Sketch of the pipeline:

    # _targets.R
    process_csv_dir <- function(csv_dir) {...} # custom user-defined function
    list(
      tar_target(csv_dir, list.files("csv", full.names = TRUE)),
      tar_target(
        processed_data,
        process_csv_dir(csv_dir),
        pattern = map(csv_dir), # dynamic branching
        format = "feather" # from the arrow package
      )
    )