Search code examples
rapache-arrow

Difficulty with unifying schemas when trying to open arrow dataset with two different file formats in R


I'm trying to open a FileSystemDataset using arrow::open_dataset() from a directory that contains two different file formats (csv & parquet). The single parquet file also has an additional field (age_group). The approach needs to be generalisable as the field names as well as file formats might change between projects.

My initial plan for dealing with more than one file format was to create a FileSystemDataset for each format and then open a single UnionDataset from all FileSystemDatasets.

However, this approach errors because one of the fields (horizon) is parsed as int64() in the csv FileSystemDataset and int32() in the parquet FileSystemDataset which doesn't allow the schema to be unified.

To get around this in a flexible and general way, I created a unified schema by keeping the schema from the first FileSystemDataset (csv) and adding any additional fields from other FileSystemDatasets. I then used that to create appropriate schema subsets for each FileSystemDataset. I tried to replace each dataset's schema through assignment but that threw the same initial error.

Finally I tried to reopen the FileSystemDatasets using the appropriate schema for each format but now in the csv FileSystemDataset, 0 csv files are read. I'm really confused as the schema in the original csv FileSystemDataset is exactly the same as the one created from the unified schema as well as the FileSystemDataset opened by explicitly specifying the schema.

Not sure what I'm doing wrong. Very open to more elegant approaches to tackling the overall problem also.

Reprex follows:

tmpdir <- tempdir()
usethis::create_from_github("annakrystalli/debugging-arrow", destdir = tmpdir,
                            fork = FALSE, open = FALSE)
#> ℹ Defaulting to 'https' Git protocol
#> ✔ Creating '/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T/RtmpS0B1ZB/debugging-arrow/'
#> ✔ Cloning repo from 'https://github.com/annakrystalli/debugging-arrow.git' into '/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T/RtmpS0B1ZB/debugging-arrow'
#> ✔ Setting active project to '/private/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T/RtmpS0B1ZB/debugging-arrow'
#> ℹ Default branch is 'main'
#> ✔ Setting active project to '<no active project>'

library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> 
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> 
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
origin_path <- file.path(tmpdir, "debugging-arrow/simple/model-output/")

fs::dir_tree(origin_path)
#> /var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T//RtmpS0B1ZB/debugging-arrow/simple/model-output/
#> ├── simple_hub-baseline
#> │   ├── 2022-10-01-simple_hub-baseline.csv
#> │   ├── 2022-10-08-simple_hub-baseline.csv
#> │   └── 2022-10-15-simple_hub-baseline.parquet
#> └── team1-goodmodel
#>     └── 2022-10-08-team1-goodmodel.csv

# Open one dataset foe each format excluding invalid files
formats <- c("csv", "parquet")

conns <- purrr::map(purrr::set_names(formats),
                    ~arrow::open_dataset(
                        origin_path, format = .x,
                        partitioning = "team",
                        factory_options = list(exclude_invalid_files = TRUE)))


arrow::open_dataset(conns, unify_schemas = FALSE)
#> Error: Type error: fields had matching names but differing types. From: horizon: int32 To: horizon: int64
# Problem arising form mismatched int fields between the two datasets
conns
#> $csv
#> FileSystemDataset with 3 csv files
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> type: string
#> type_id: double
#> value: int64
#> team: string
#> 
#> $parquet
#> FileSystemDataset with 1 Parquet file
#> origin_date: date32[day]
#> target: string
#> horizon: int32
#> location: string
#> age_group: string
#> type: string
#> type_id: double
#> value: int32
#> team: string


# Functions ----
# Function to get schema for fields in y not present in x
unify_conn_schema <- function(x, y) {
    setdiff(y$schema$names, x$schema$names) %>%
        purrr::map(~y$schema$GetFieldByName(.x)) %>%
        arrow::schema() %>%
        arrow::unify_schemas(x$schema)
}

# Get schema for fields in a dataset connection from a unified schema
get_unified_schema <- function(x, unified_schema) {
    new_schema <- x$schema$names %>%
        purrr::map(~unified_schema$GetFieldByName(.x)) %>%
        arrow::schema()
}

# Get unified schema across all datasets
unified_schema <- purrr::reduce(conns, unify_conn_schema)
unified_schema
#> Schema
#> age_group: string
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> type: string
#> type_id: double
#> value: int64
#> team: string

# Get schema for each connection from unified schema
conn_schema <- purrr::map(conns, ~get_unified_schema(.x,
                                                     unified_schema))

conn_schema
#> $csv
#> Schema
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> type: string
#> type_id: double
#> value: int64
#> team: string
#> 
#> $parquet
#> Schema
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> age_group: string
#> type: string
#> type_id: double
#> value: int64
#> team: string


# Replacing the active binding via assignment doesn't seem to work
purrr::map2(conns, conn_schema,
            function(x, y){
                x$schema <- y})
#> Error in `purrr::map2()`:
#> ℹ In index: 2.
#> ℹ With name: parquet.
#> Caused by error:
#> ! Type error: fields had matching names but differing types. From: horizon: int32 To: horizon: int64

#> Backtrace:
#>      ▆
#>   1. ├─purrr::map2(...)
#>   2. │ └─purrr:::map2_("list", .x, .y, .f, ..., .progress = .progress)
#>   3. │   ├─purrr:::with_indexed_errors(...)
#>   4. │   │ └─base::withCallingHandlers(...)
#>   5. │   ├─purrr:::call_with_cleanup(...)
#>   6. │   └─global .f(.x[[i]], .y[[i]], ...)
#>   7. ├─arrow (local) `<fn>`(base::quote(`<Schema>`))
#>   8. │ └─self$WithSchema(schema)
#>   9. │   └─arrow:::dataset___Dataset__ReplaceSchema(self, schema)
#>  10. └─base::.handleSimpleError(...)
#>  11.   └─purrr (local) h(simpleError(msg, call))
#>  12.     └─cli::cli_abort(...)
#>  13.       └─rlang::abort(...)

# reconnect using appropriate schema for each connection
conns_unified <- purrr::map2(names(conns), conn_schema,
                     ~arrow::open_dataset(
                         origin_path, format = .x,
                         partitioning = "team",
                         factory_options = list(exclude_invalid_files = TRUE),
                         schema = .y))

conns_unified
#> [[1]]
#> FileSystemDataset with 0 csv files
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> type: string
#> type_id: double
#> value: int64
#> team: string
#> 
#> [[2]]
#> FileSystemDataset with 1 Parquet file
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> age_group: string
#> type: string
#> type_id: double
#> value: int64
#> team: string

# All schema are equal!
all.equal(conns[[1]]$schema, conns_unified[[1]]$schema, conn_schema[[1]])
#> [1] TRUE

Created on 2023-03-06 with reprex v2.0.2

Session info
sessioninfo::session_info()
#> ─ Session info ───────────────────────────────────────────────────────────────
#>  setting  value
#>  version  R version 4.2.1 (2022-06-23)
#>  os       macOS Ventura 13.2.1
#>  system   aarch64, darwin20
#>  ui       X11
#>  language (EN)
#>  collate  en_US.UTF-8
#>  ctype    en_US.UTF-8
#>  tz       Europe/Athens
#>  date     2023-03-06
#>  pandoc   2.19.2 @ /Applications/RStudio.app/Contents/Resources/app/quarto/bin/tools/ (via rmarkdown)
#> 
#> ─ Packages ───────────────────────────────────────────────────────────────────
#>  package     * version  date (UTC) lib source
#>  arrow         11.0.0.2 2023-02-12 [1] CRAN (R 4.2.0)
#>  askpass       1.1      2019-01-13 [1] CRAN (R 4.2.0)
#>  assertthat    0.2.1    2019-03-21 [1] CRAN (R 4.2.0)
#>  bit           4.0.5    2022-11-15 [1] CRAN (R 4.2.0)
#>  bit64         4.0.5    2020-08-30 [1] CRAN (R 4.2.0)
#>  cli           3.6.0    2023-01-09 [1] CRAN (R 4.2.0)
#>  crayon        1.5.2    2022-09-29 [1] CRAN (R 4.2.0)
#>  credentials   1.3.2    2021-11-29 [1] CRAN (R 4.2.1)
#>  curl          5.0.0    2023-01-12 [1] CRAN (R 4.2.0)
#>  digest        0.6.31   2022-12-11 [1] CRAN (R 4.2.0)
#>  dplyr       * 1.1.0    2023-01-29 [1] CRAN (R 4.2.0)
#>  evaluate      0.20     2023-01-17 [1] CRAN (R 4.2.0)
#>  fansi         1.0.4    2023-01-22 [1] CRAN (R 4.2.0)
#>  fastmap       1.1.1    2023-02-24 [1] CRAN (R 4.2.0)
#>  fs            1.6.1    2023-02-06 [1] CRAN (R 4.2.0)
#>  generics      0.1.3    2022-07-05 [1] CRAN (R 4.2.1)
#>  gert          1.9.2    2022-12-05 [1] CRAN (R 4.2.0)
#>  gh            1.3.1    2022-09-08 [1] CRAN (R 4.2.1)
#>  gitcreds      0.1.2    2022-09-08 [1] CRAN (R 4.2.1)
#>  glue          1.6.2    2022-02-24 [1] CRAN (R 4.2.0)
#>  htmltools     0.5.4    2022-12-07 [1] CRAN (R 4.2.0)
#>  httr          1.4.5    2023-02-24 [1] CRAN (R 4.2.0)
#>  jsonlite      1.8.4    2022-12-06 [1] CRAN (R 4.2.0)
#>  knitr         1.42     2023-01-25 [1] CRAN (R 4.2.0)
#>  lifecycle     1.0.3    2022-10-07 [1] CRAN (R 4.2.0)
#>  magrittr      2.0.3    2022-03-30 [1] CRAN (R 4.2.0)
#>  openssl       2.0.5    2022-12-06 [1] CRAN (R 4.2.0)
#>  pillar        1.8.1    2022-08-19 [1] CRAN (R 4.2.0)
#>  pkgconfig     2.0.3    2019-09-22 [1] CRAN (R 4.2.0)
#>  purrr         1.0.1    2023-01-10 [1] CRAN (R 4.2.0)
#>  R.cache       0.16.0   2022-07-21 [1] CRAN (R 4.2.0)
#>  R.methodsS3   1.8.2    2022-06-13 [1] CRAN (R 4.2.0)
#>  R.oo          1.25.0   2022-06-12 [1] CRAN (R 4.2.0)
#>  R.utils       2.12.0   2022-06-28 [1] CRAN (R 4.2.0)
#>  R6            2.5.1    2021-08-19 [1] CRAN (R 4.2.0)
#>  reprex        2.0.2    2022-08-17 [3] CRAN (R 4.2.0)
#>  rlang         1.0.6    2022-09-24 [1] CRAN (R 4.2.0)
#>  rmarkdown     2.20     2023-01-19 [1] CRAN (R 4.2.0)
#>  rprojroot     2.0.3    2022-04-02 [1] CRAN (R 4.2.1)
#>  rstudioapi    0.14     2022-08-22 [1] CRAN (R 4.2.1)
#>  sessioninfo   1.2.2    2021-12-06 [3] CRAN (R 4.2.0)
#>  styler        1.7.0    2022-03-13 [1] CRAN (R 4.2.0)
#>  sys           3.4.1    2022-10-18 [1] CRAN (R 4.2.0)
#>  tibble        3.1.8    2022-07-22 [1] CRAN (R 4.2.0)
#>  tidyselect    1.2.0    2022-10-10 [1] CRAN (R 4.2.0)
#>  usethis       2.1.6    2022-05-25 [1] CRAN (R 4.2.0)
#>  utf8          1.2.3    2023-01-31 [1] CRAN (R 4.2.0)
#>  vctrs         0.5.2    2023-01-23 [1] CRAN (R 4.2.0)
#>  withr         2.5.0    2022-03-03 [1] CRAN (R 4.2.0)
#>  xfun          0.37     2023-01-31 [1] CRAN (R 4.2.0)
#>  yaml          2.3.7    2023-01-23 [1] CRAN (R 4.2.0)
#> 
#>  [1] /Users/Anna/Library/R/arm64/4.2/library
#>  [2] /Library/Frameworks/R.framework/Versions/4.2-arm64/Resources/site-library
#>  [3] /Library/Frameworks/R.framework/Versions/4.2-arm64/Resources/library
#> 
#> ──────────────────────────────────────────────────────────────────────────────

Solution

  • I found what I think is a nice-looking and even tidyverse-ish solution.

    Instead of working with schemas directly, you can specify col_types when you call open_dataset with format="csv". I call this tidyverse-ish because this is also how you'd do the same for with readr and you only need to specify the column types you want to be different from what is automatically guessed. One downside is that you don't get to use your map or at least I don't know how to make this work with a map.

    conns <- list()
    
    conns$csv <- open_dataset(
      origin_path, 
      format = "csv", 
      col_types = schema(
        "horizon" = int32(),
        "value" = int32()
      ),
      factory_options = list(
        exclude_invalid_files = TRUE
      )
    )
    
    conns$parquet <- open_dataset(
      origin_path, 
      format = "parquet", 
      factory_options = list(
        exclude_invalid_files = TRUE
      )
    )
    
    ds <- arrow::open_dataset(conns)
    
    > ds$schema
    Schema
    origin_date: date32[day]
    target: string
    horizon: int32
    location: string
    type: string
    type_id: double
    value: int32
    age_group: string
    

    Initial exploration of the dataset makes it seem like it was read in correctly but please let me know.