I'm new to sparklyr and I'm trying to add random normal vectors weighted by another vector to a large number of columns of a spark df. This is an example I've tried with mtcars.
library(sparklyr)
library(dplyr)
sc1 <- spark_connect(master = "local")
mtc_df = select(mtcars, vs:carb)
mtc_sdf = sdf_copy_to(sc1, mtc_df, name = "mtc_sdf", overwrite = TRUE)
tf_df <- function(df1){
df1 %>%
mutate_at(vars(am:carb), funs(. + vs * rnorm(32, 100, 1)))
}
tf_df(mtc_df) # works
mtc_sdf %>%
spark_apply(function(d) tf_df(d), memory = TRUE) # doesn't work
I get the following error:
Error in file(con, "r") : cannot open the connection
In addition: Warning message:
In file(con, "r") :
cannot open file 'C:\....\filea54a7656c3_spark.log': Permission denied
I also tried to adapt the example on https://spark.rstudio.com/ but got the same error.
mtc_sdf %>%
spark_apply(function(data) {
data[2:4] + data[1]*rnorm(32*3,100,1)
})
Any help would be much appreciated.
I'm trying to add random normal vectors weighted by another vector to a large number of columns of a spark df
I would recommend skipping spark_apply
and using Spark's own randn
(which gives ~N(0, 1)):
mtc_sdf %>% mutate_at(vars(am:carb), funs(. + vs * (randn() * 1 + 100)))
# Source: lazy query [?? x 4]
# Database: spark_connection
vs am gear carb
<dbl> <dbl> <dbl> <dbl>
1 0 1.00000 4.0000 4.00000
2 0 1.00000 4.0000 4.00000
3 1 101.36894 103.1954 98.80757
4 1 100.79066 102.6765 100.91702
5 0 0.00000 3.0000 2.00000
6 1 100.07964 103.1568 100.54303
7 0 0.00000 3.0000 4.00000
8 1 101.90050 103.0402 101.46825
9 1 99.63565 103.7781 101.65752
10 1 99.72587 102.3854 105.09205
Regarding your code:
winutilis
are properly used.Function used with spark_apply
:
transforms a data frame partition into a data frame.
So you cannot hard code number of rows. You should rather use something like rnorm(nrow(df1), 100, 1))
.
sparklyr
doesn't seem to properly serialize functions referenced by name so you might have to inline the function or wrap it in a package:
mtc_sdf %>%
spark_apply(function(df) dplyr::mutate_at(
df, dplyr::vars(am:carb), dplyr::funs(. + vs * rnorm(nrow(df), 100, 1))))
# Source: table<sparklyr_tmp_34ce7faa2d33> [?? x 4]
# Database: spark_connection
vs am gear carb
<dbl> <dbl> <dbl> <dbl>
1 0 1.00000 4.0000 4.00000
2 0 1.00000 4.0000 4.00000
3 1 100.59678 101.9111 100.99830
4 1 98.87146 104.8058 99.20102
5 0 0.00000 3.0000 2.00000
6 1 99.38243 102.8664 100.37921
7 0 0.00000 3.0000 4.00000
8 1 98.99019 103.4996 101.69110
9 1 99.33687 102.3849 103.38833
10 1 100.02103 104.9381 102.07139
# ... with more rows
Also please not that packages from the driver are not attached automatically so you have to do it manually or reference library functions using fully qualified names.