Search code examples
rapache-sparkdplyrsparklyr

Adding name of file when using sparklyr::spark_read_json


I have millions of json-files, where each of the files contains the same number of columns, lets say x and y. Note that the length of x and y is equal for a single file, but could be different when comparing two different files.

The problem is that the only thing that separates the data is the name of the file. So when combining the files I'd like to have the name of the file included as a third column. Is this possible using sparklyr::spark_read_json, i.e. when using wildcards?

MWE:

library(sparklyr)

## Spark connection
sc <- spark_connect(master = "local", version = "2.1.0")

## Create data
data_dir <- tempdir()
tbl_json1 <- data.frame(x = 1:3, y = 1:3)
tbl_json2 <- data.frame(x = 1:10, y = 1:10)

## Write data to disk
write(jsonlite::toJSON(tbl_json1), sprintf("%s/tab1.json", data_dir))
write(jsonlite::toJSON(tbl_json2), sprintf("%s/tab2.json", data_dir))

## Read both files using wildcard 
combined_table <- spark_read_json(
    sc, 
    name = "combined_table", 
    path = sprintf("%s/*.json", data_dir)
)

## Tranfer results to R
library(dplyr)
dt <- combined_table %>% collect

# # A tibble: 13 x 2
#       x     y
#     <dbl> <dbl>
#  1    1.    1.
#  2    2.    2.
#  3    3.    3.
#  4    4.    4.
#  5    5.    5.
#  6    6.    6.
#  7    7.    7.
#  8    8.    8.
#  9    9.    9.
# 10   10.   10.
# 11    1.    1.
# 12    2.    2.
# 13    3.    3.

Wanted output

# # A tibble: 13 x 2
#       x     y     id
#     <dbl> <dbl> <chr>
#  1    1.    1.    tab2
#  2    2.    2.    tab2
#  3    3.    3.    tab2
#  4    4.    4.    tab2
#  5    5.    5.    tab2
#  6    6.    6.    tab2
#  7    7.    7.    tab2
#  8    8.    8.    tab2
#  9    9.    9.    tab2
# 10   10.   10.    tab2
# 11    1.    1.    tab1
# 12    2.    2.    tab1
# 13    3.    3.    tab1

Solution

  • You can disable eager caching (you really should anyway):

    combined_table <- spark_read_json(
      sc, 
      name = "combined_table", 
      path = sprintf("%s/*.json", data_dir),
      memory=FALSE
    )
    

    and use input_file_name function:

    combined_table %>% mutate(id = input_file_name())
    
    # Source:   lazy query [?? x 3]
    # Database: spark_connection
           x     y id                              
       <dbl> <dbl> <chr>                           
     1     1     1 file:///tmp/RtmpnIAUek/tab2.json
     2     2     2 file:///tmp/RtmpnIAUek/tab2.json
     3     3     3 file:///tmp/RtmpnIAUek/tab2.json
     4     4     4 file:///tmp/RtmpnIAUek/tab2.json
     5     5     5 file:///tmp/RtmpnIAUek/tab2.json
     6     6     6 file:///tmp/RtmpnIAUek/tab2.json
     7     7     7 file:///tmp/RtmpnIAUek/tab2.json
     8     8     8 file:///tmp/RtmpnIAUek/tab2.json
     9     9     9 file:///tmp/RtmpnIAUek/tab2.json
    10    10    10 file:///tmp/RtmpnIAUek/tab2.json
    # ... with more rows
    

    If needed it can be combined with Hive's parse_url UDF:

    combined_table %>% mutate(id = parse_url(input_file_name(), "FILE"))
    
    # Source:   lazy query [?? x 3]
    # Database: spark_connection
           x     y id                       
       <dbl> <dbl> <chr>                    
     1     1     1 /tmp/RtmpnIAUek/tab2.json
     2     2     2 /tmp/RtmpnIAUek/tab2.json
     3     3     3 /tmp/RtmpnIAUek/tab2.json
     4     4     4 /tmp/RtmpnIAUek/tab2.json
     5     5     5 /tmp/RtmpnIAUek/tab2.json
     6     6     6 /tmp/RtmpnIAUek/tab2.json
     7     7     7 /tmp/RtmpnIAUek/tab2.json
     8     8     8 /tmp/RtmpnIAUek/tab2.json
     9     9     9 /tmp/RtmpnIAUek/tab2.json
    10    10    10 /tmp/RtmpnIAUek/tab2.json
    # ... with more rows
    

    and you can use other string processing function to extract individual bits of information.