I'm trying to open a FileSystemDataset using arrow::open_dataset()
from a directory that contains two different file formats (csv & parquet). The single parquet file also has an additional field (age_group
). The approach needs to be generalisable as the field names as well as file formats might change between projects.
My initial plan for dealing with more than one file format was to create a FileSystemDataset
for each format and then open a single UnionDataset
from all FileSystemDataset
s.
However, this approach errors because one of the fields (horizon
) is parsed as int64()
in the csv FileSystemDataset
and int32()
in the parquet FileSystemDataset
which doesn't allow the schema to be unified.
To get around this in a flexible and general way, I created a unified schema by keeping the schema from the first FileSystemDataset (csv) and adding any additional fields from other FileSystemDatasets. I then used that to create appropriate schema subsets for each FileSystemDataset. I tried to replace each dataset's schema through assignment but that threw the same initial error.
Finally I tried to reopen the FileSystemDatasets using the appropriate schema for each format but now in the csv FileSystemDataset, 0 csv files are read. I'm really confused as the schema in the original csv FileSystemDataset is exactly the same as the one created from the unified schema as well as the FileSystemDataset opened by explicitly specifying the schema.
Not sure what I'm doing wrong. Very open to more elegant approaches to tackling the overall problem also.
Reprex follows:
tmpdir <- tempdir()
usethis::create_from_github("annakrystalli/debugging-arrow", destdir = tmpdir,
fork = FALSE, open = FALSE)
#> ℹ Defaulting to 'https' Git protocol
#> ✔ Creating '/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T/RtmpS0B1ZB/debugging-arrow/'
#> ✔ Cloning repo from 'https://github.com/annakrystalli/debugging-arrow.git' into '/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T/RtmpS0B1ZB/debugging-arrow'
#> ✔ Setting active project to '/private/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T/RtmpS0B1ZB/debugging-arrow'
#> ℹ Default branch is 'main'
#> ✔ Setting active project to '<no active project>'
library(dplyr)
#>
#> Attaching package: 'dplyr'
#>
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#>
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
origin_path <- file.path(tmpdir, "debugging-arrow/simple/model-output/")
fs::dir_tree(origin_path)
#> /var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T//RtmpS0B1ZB/debugging-arrow/simple/model-output/
#> ├── simple_hub-baseline
#> │ ├── 2022-10-01-simple_hub-baseline.csv
#> │ ├── 2022-10-08-simple_hub-baseline.csv
#> │ └── 2022-10-15-simple_hub-baseline.parquet
#> └── team1-goodmodel
#> └── 2022-10-08-team1-goodmodel.csv
# Open one dataset foe each format excluding invalid files
formats <- c("csv", "parquet")
conns <- purrr::map(purrr::set_names(formats),
~arrow::open_dataset(
origin_path, format = .x,
partitioning = "team",
factory_options = list(exclude_invalid_files = TRUE)))
arrow::open_dataset(conns, unify_schemas = FALSE)
#> Error: Type error: fields had matching names but differing types. From: horizon: int32 To: horizon: int64
# Problem arising form mismatched int fields between the two datasets
conns
#> $csv
#> FileSystemDataset with 3 csv files
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> type: string
#> type_id: double
#> value: int64
#> team: string
#>
#> $parquet
#> FileSystemDataset with 1 Parquet file
#> origin_date: date32[day]
#> target: string
#> horizon: int32
#> location: string
#> age_group: string
#> type: string
#> type_id: double
#> value: int32
#> team: string
# Functions ----
# Function to get schema for fields in y not present in x
unify_conn_schema <- function(x, y) {
setdiff(y$schema$names, x$schema$names) %>%
purrr::map(~y$schema$GetFieldByName(.x)) %>%
arrow::schema() %>%
arrow::unify_schemas(x$schema)
}
# Get schema for fields in a dataset connection from a unified schema
get_unified_schema <- function(x, unified_schema) {
new_schema <- x$schema$names %>%
purrr::map(~unified_schema$GetFieldByName(.x)) %>%
arrow::schema()
}
# Get unified schema across all datasets
unified_schema <- purrr::reduce(conns, unify_conn_schema)
unified_schema
#> Schema
#> age_group: string
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> type: string
#> type_id: double
#> value: int64
#> team: string
# Get schema for each connection from unified schema
conn_schema <- purrr::map(conns, ~get_unified_schema(.x,
unified_schema))
conn_schema
#> $csv
#> Schema
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> type: string
#> type_id: double
#> value: int64
#> team: string
#>
#> $parquet
#> Schema
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> age_group: string
#> type: string
#> type_id: double
#> value: int64
#> team: string
# Replacing the active binding via assignment doesn't seem to work
purrr::map2(conns, conn_schema,
function(x, y){
x$schema <- y})
#> Error in `purrr::map2()`:
#> ℹ In index: 2.
#> ℹ With name: parquet.
#> Caused by error:
#> ! Type error: fields had matching names but differing types. From: horizon: int32 To: horizon: int64
#> Backtrace:
#> ▆
#> 1. ├─purrr::map2(...)
#> 2. │ └─purrr:::map2_("list", .x, .y, .f, ..., .progress = .progress)
#> 3. │ ├─purrr:::with_indexed_errors(...)
#> 4. │ │ └─base::withCallingHandlers(...)
#> 5. │ ├─purrr:::call_with_cleanup(...)
#> 6. │ └─global .f(.x[[i]], .y[[i]], ...)
#> 7. ├─arrow (local) `<fn>`(base::quote(`<Schema>`))
#> 8. │ └─self$WithSchema(schema)
#> 9. │ └─arrow:::dataset___Dataset__ReplaceSchema(self, schema)
#> 10. └─base::.handleSimpleError(...)
#> 11. └─purrr (local) h(simpleError(msg, call))
#> 12. └─cli::cli_abort(...)
#> 13. └─rlang::abort(...)
# reconnect using appropriate schema for each connection
conns_unified <- purrr::map2(names(conns), conn_schema,
~arrow::open_dataset(
origin_path, format = .x,
partitioning = "team",
factory_options = list(exclude_invalid_files = TRUE),
schema = .y))
conns_unified
#> [[1]]
#> FileSystemDataset with 0 csv files
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> type: string
#> type_id: double
#> value: int64
#> team: string
#>
#> [[2]]
#> FileSystemDataset with 1 Parquet file
#> origin_date: date32[day]
#> target: string
#> horizon: int64
#> location: string
#> age_group: string
#> type: string
#> type_id: double
#> value: int64
#> team: string
# All schema are equal!
all.equal(conns[[1]]$schema, conns_unified[[1]]$schema, conn_schema[[1]])
#> [1] TRUE
Created on 2023-03-06 with reprex v2.0.2
Session infosessioninfo::session_info()
#> ─ Session info ───────────────────────────────────────────────────────────────
#> setting value
#> version R version 4.2.1 (2022-06-23)
#> os macOS Ventura 13.2.1
#> system aarch64, darwin20
#> ui X11
#> language (EN)
#> collate en_US.UTF-8
#> ctype en_US.UTF-8
#> tz Europe/Athens
#> date 2023-03-06
#> pandoc 2.19.2 @ /Applications/RStudio.app/Contents/Resources/app/quarto/bin/tools/ (via rmarkdown)
#>
#> ─ Packages ───────────────────────────────────────────────────────────────────
#> package * version date (UTC) lib source
#> arrow 11.0.0.2 2023-02-12 [1] CRAN (R 4.2.0)
#> askpass 1.1 2019-01-13 [1] CRAN (R 4.2.0)
#> assertthat 0.2.1 2019-03-21 [1] CRAN (R 4.2.0)
#> bit 4.0.5 2022-11-15 [1] CRAN (R 4.2.0)
#> bit64 4.0.5 2020-08-30 [1] CRAN (R 4.2.0)
#> cli 3.6.0 2023-01-09 [1] CRAN (R 4.2.0)
#> crayon 1.5.2 2022-09-29 [1] CRAN (R 4.2.0)
#> credentials 1.3.2 2021-11-29 [1] CRAN (R 4.2.1)
#> curl 5.0.0 2023-01-12 [1] CRAN (R 4.2.0)
#> digest 0.6.31 2022-12-11 [1] CRAN (R 4.2.0)
#> dplyr * 1.1.0 2023-01-29 [1] CRAN (R 4.2.0)
#> evaluate 0.20 2023-01-17 [1] CRAN (R 4.2.0)
#> fansi 1.0.4 2023-01-22 [1] CRAN (R 4.2.0)
#> fastmap 1.1.1 2023-02-24 [1] CRAN (R 4.2.0)
#> fs 1.6.1 2023-02-06 [1] CRAN (R 4.2.0)
#> generics 0.1.3 2022-07-05 [1] CRAN (R 4.2.1)
#> gert 1.9.2 2022-12-05 [1] CRAN (R 4.2.0)
#> gh 1.3.1 2022-09-08 [1] CRAN (R 4.2.1)
#> gitcreds 0.1.2 2022-09-08 [1] CRAN (R 4.2.1)
#> glue 1.6.2 2022-02-24 [1] CRAN (R 4.2.0)
#> htmltools 0.5.4 2022-12-07 [1] CRAN (R 4.2.0)
#> httr 1.4.5 2023-02-24 [1] CRAN (R 4.2.0)
#> jsonlite 1.8.4 2022-12-06 [1] CRAN (R 4.2.0)
#> knitr 1.42 2023-01-25 [1] CRAN (R 4.2.0)
#> lifecycle 1.0.3 2022-10-07 [1] CRAN (R 4.2.0)
#> magrittr 2.0.3 2022-03-30 [1] CRAN (R 4.2.0)
#> openssl 2.0.5 2022-12-06 [1] CRAN (R 4.2.0)
#> pillar 1.8.1 2022-08-19 [1] CRAN (R 4.2.0)
#> pkgconfig 2.0.3 2019-09-22 [1] CRAN (R 4.2.0)
#> purrr 1.0.1 2023-01-10 [1] CRAN (R 4.2.0)
#> R.cache 0.16.0 2022-07-21 [1] CRAN (R 4.2.0)
#> R.methodsS3 1.8.2 2022-06-13 [1] CRAN (R 4.2.0)
#> R.oo 1.25.0 2022-06-12 [1] CRAN (R 4.2.0)
#> R.utils 2.12.0 2022-06-28 [1] CRAN (R 4.2.0)
#> R6 2.5.1 2021-08-19 [1] CRAN (R 4.2.0)
#> reprex 2.0.2 2022-08-17 [3] CRAN (R 4.2.0)
#> rlang 1.0.6 2022-09-24 [1] CRAN (R 4.2.0)
#> rmarkdown 2.20 2023-01-19 [1] CRAN (R 4.2.0)
#> rprojroot 2.0.3 2022-04-02 [1] CRAN (R 4.2.1)
#> rstudioapi 0.14 2022-08-22 [1] CRAN (R 4.2.1)
#> sessioninfo 1.2.2 2021-12-06 [3] CRAN (R 4.2.0)
#> styler 1.7.0 2022-03-13 [1] CRAN (R 4.2.0)
#> sys 3.4.1 2022-10-18 [1] CRAN (R 4.2.0)
#> tibble 3.1.8 2022-07-22 [1] CRAN (R 4.2.0)
#> tidyselect 1.2.0 2022-10-10 [1] CRAN (R 4.2.0)
#> usethis 2.1.6 2022-05-25 [1] CRAN (R 4.2.0)
#> utf8 1.2.3 2023-01-31 [1] CRAN (R 4.2.0)
#> vctrs 0.5.2 2023-01-23 [1] CRAN (R 4.2.0)
#> withr 2.5.0 2022-03-03 [1] CRAN (R 4.2.0)
#> xfun 0.37 2023-01-31 [1] CRAN (R 4.2.0)
#> yaml 2.3.7 2023-01-23 [1] CRAN (R 4.2.0)
#>
#> [1] /Users/Anna/Library/R/arm64/4.2/library
#> [2] /Library/Frameworks/R.framework/Versions/4.2-arm64/Resources/site-library
#> [3] /Library/Frameworks/R.framework/Versions/4.2-arm64/Resources/library
#>
#> ──────────────────────────────────────────────────────────────────────────────
I found what I think is a nice-looking and even tidyverse-ish solution.
Instead of working with schemas directly, you can specify col_types
when you call open_dataset
with format="csv"
. I call this tidyverse-ish because this is also how you'd do the same for with readr and you only need to specify the column types you want to be different from what is automatically guessed. One downside is that you don't get to use your map or at least I don't know how to make this work with a map.
conns <- list()
conns$csv <- open_dataset(
origin_path,
format = "csv",
col_types = schema(
"horizon" = int32(),
"value" = int32()
),
factory_options = list(
exclude_invalid_files = TRUE
)
)
conns$parquet <- open_dataset(
origin_path,
format = "parquet",
factory_options = list(
exclude_invalid_files = TRUE
)
)
ds <- arrow::open_dataset(conns)
> ds$schema
Schema
origin_date: date32[day]
target: string
horizon: int32
location: string
type: string
type_id: double
value: int32
age_group: string
Initial exploration of the dataset makes it seem like it was read in correctly but please let me know.