I have a folder on HDFS which contains 10 CSV files. Each CSV file contains 10000 rows and 17 columns.
Objective
Reactively read a folder on HDFS.
If the folder contains files, read one file at a time (oldest to newest) from the folder.
Plot some parameters in Shiny.
Update plot as new files are added to the folder or read from the folder.
Status Currently, with SparklyR, I am able to reactively read all files at once and generate a plot containing 100000 points (ggplot). If I add an 11th file (containing 10000 rows) after starting the app, the plot gets updated with 110000 points.
library(sparklyr)
conf = spark_config()
conf$spark.driver.memory="50g"
sc <- spark_connect(master = "local[*]", config = conf)
read_folder <- stream_read_csv(sc, "hdfs://localhost:9000/nik_ml/")
ui <- function(){
plotOutput("plot")
}
server <- function(input, output, session){
ps <- reactiveSpark(read_folder, intervalMillis = 10)
output$plot <- renderPlot({
df2 = ps()
# str(df2)
ggplot(data = df2, aes(x=Time, y=outletN2)) + geom_point() + ggtitle(nrow(df2)) + theme_bw()
})
}
shinyApp(ui, server)
SessionInfo()
# R version 3.5.1 (2018-07-02)
# Platform: x86_64-w64-mingw32/x64 (64-bit)
# Running under: Windows Server >= 2012 x64 (build 9200)
#
# Matrix products: default
#
# locale:
# [1] LC_COLLATE=English_United States.1252 LC_CTYPE=English_United States.1252
# [3] LC_MONETARY=English_United States.1252 LC_NUMERIC=C
# [5] LC_TIME=English_United States.1252
#
# attached base packages:
# [1] stats graphics grDevices utils datasets methods base
#
# other attached packages:
# [1] shinyFiles_0.7.2 bindrcpp_0.2.2 dplyr_0.7.8 shiny_1.2.0 ggplot2_3.1.0
# [6] future_1.10.0 sparklyr_0.9.3.9000
#
# loaded via a namespace (and not attached):
# [1] tidyselect_0.2.5 forge_0.1.9002 purrr_0.2.5 listenv_0.7.0 lattice_0.20-38 colorspace_1.3-2
# [7] generics_0.0.2 htmltools_0.3.6 yaml_2.2.0 base64enc_0.1-3 rlang_0.3.0.1 later_0.7.5
# [13] pillar_1.3.0 glue_1.3.0 withr_2.1.2 DBI_1.0.0 dbplyr_1.2.2 bindr_0.1.1
# [19] plyr_1.8.4 munsell_0.5.0 gtable_0.2.0 htmlwidgets_1.3 codetools_0.2-15 labeling_0.3
# [25] httpuv_1.4.5 parallel_3.5.1 broom_0.5.1 r2d3_0.2.2 Rcpp_1.0.0 xtable_1.8-3
# [31] openssl_1.1 promises_1.0.1 backports_1.1.2 scales_1.0.0 jsonlite_1.6 config_0.3
# [37] fs_1.2.6 mime_0.6 digest_0.6.18 grid_3.5.1 rprojroot_1.3-2 tools_3.5.1
# [43] magrittr_1.5 lazyeval_0.2.1 tibble_1.4.2 crayon_1.3.4 tidyr_0.8.2 pkgconfig_2.0.2
# [49] rsconnect_0.8.12 assertthat_0.2.0 httr_1.4.0 rstudioapi_0.8 R6_2.3.0 globals_0.12.4
# [55] nlme_3.1-137 compiler_3.5.1
But what I really want is to reactively read a single file at a time and make a ggplot. This is similar to Spark Streaming, but Spark Streaming (from what I understand) reads ALL text files into a single RDD. From Spark's documentation, there exist a function in Python called SparkContext.wholeTextFiles which lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs (link). I haven't tested it as I want to keep everything in R right now. I looked into shinyFiles but couldn't find any function that does this (https://github.com/thomasp85/shinyFiles).
Is there anything similar in R/Sparklyr? Is what I am trying to do sounds silly? If you think there is a more efficient way of achieving it in R, I am all ears!
Thanks.
I found a way, with some help from @tricky. Full solution below. Dirty but works for now.
# Get list of current files in HDFS
files <- system("hadoop fs -ls /nik_ml", show.output.on.console = FALSE, intern = TRUE)
# Extract file names
fileNames <- na.omit(str_extract(files, "(?<=/)[^/]*$"))
# CheckFunc for reactivePoll, checks for changes in fileNames
listFiles <- function(){
files <<- system("hadoop fs -ls /nik_ml", show.output.on.console = FALSE, intern = TRUE)
fileNames <<- na.omit(str_extract(files, "(?<=/)[^/]*$"))
fileNames
}
# ValueFunc for reactivePoll. Returns a vector of HDFS filepaths
ReadHdfsData=function(){
path <- paste0("hdfs://localhost:9000/nik_ml/", fileNames)
return(path)
}
ui3 <- function(){
plotOutput("plot")
}
server3 <- function(input, output, session){
output$plot <- renderPlot({
allFiles <- reactivePoll(5 * 1000, session, listFiles, ReadHdfsData)
# Find filepaths which are added to HDFS
newFile <<- setdiff(allFiles(), newFile)
# print(newFile)
# Do something with each new file.
# I am plotting currently, but I will end up using it for ML predictions.
for(temp in newFile){
df <- spark_read_csv(sc, "name", temp) %>%
select(Time, outletN2) %>%
collect()
# print(head(df))
p1 <- ggplot(data = df, aes(x=Time, y=outletN2)) +
geom_point() +
ggtitle(paste("File =",temp)) +
theme_bw()
print(p1)
}
})
}
# Initialise newFile to "" before running the app
newFile <- character(0)
shinyApp(ui3, server3)