Search code examples
rapache-sparkapache-spark-sqlsparklyr

Using spark_apply in sparklyr to add weighted random normal vectors to multiple DF columns


I'm new to sparklyr and I'm trying to add random normal vectors weighted by another vector to a large number of columns of a spark df. This is an example I've tried with mtcars.

library(sparklyr)
library(dplyr)
sc1 <- spark_connect(master = "local")

mtc_df = select(mtcars, vs:carb)
mtc_sdf = sdf_copy_to(sc1, mtc_df, name = "mtc_sdf", overwrite = TRUE)

tf_df <- function(df1){
    df1 %>%
        mutate_at(vars(am:carb), funs(. + vs * rnorm(32, 100, 1)))
}

tf_df(mtc_df) # works 

mtc_sdf %>%
    spark_apply(function(d) tf_df(d), memory = TRUE) # doesn't work

I get the following error:

Error in file(con, "r") : cannot open the connection
In addition: Warning message:
In file(con, "r") :
  cannot open file 'C:\....\filea54a7656c3_spark.log': Permission denied

I also tried to adapt the example on https://spark.rstudio.com/ but got the same error.

mtc_sdf %>%
    spark_apply(function(data) {
        data[2:4] + data[1]*rnorm(32*3,100,1)
    })

Any help would be much appreciated.


Solution

  • I'm trying to add random normal vectors weighted by another vector to a large number of columns of a spark df

    I would recommend skipping spark_apply and using Spark's own randn (which gives ~N(0, 1)):

    mtc_sdf %>% mutate_at(vars(am:carb), funs(. + vs * (randn() * 1 + 100)))
    
    # Source:   lazy query [?? x 4]
    # Database: spark_connection
          vs        am     gear      carb
       <dbl>     <dbl>    <dbl>     <dbl>
     1     0   1.00000   4.0000   4.00000
     2     0   1.00000   4.0000   4.00000
     3     1 101.36894 103.1954  98.80757
     4     1 100.79066 102.6765 100.91702
     5     0   0.00000   3.0000   2.00000
     6     1 100.07964 103.1568 100.54303
     7     0   0.00000   3.0000   4.00000
     8     1 101.90050 103.0402 101.46825
     9     1  99.63565 103.7781 101.65752
    10     1  99.72587 102.3854 105.09205
    

    Regarding your code:

    • Problem you experience looks like permission issue. Please make sure that Spark user has all required permission and winutilis are properly used.
    • Function used with spark_apply:

      transforms a data frame partition into a data frame.

      So you cannot hard code number of rows. You should rather use something like rnorm(nrow(df1), 100, 1)).

    • sparklyr doesn't seem to properly serialize functions referenced by name so you might have to inline the function or wrap it in a package:

      mtc_sdf %>% 
        spark_apply(function(df) dplyr::mutate_at(
          df, dplyr::vars(am:carb), dplyr::funs(. + vs * rnorm(nrow(df), 100, 1))))
      
      # Source:   table<sparklyr_tmp_34ce7faa2d33> [?? x 4]
      # Database: spark_connection
            vs        am     gear      carb
         <dbl>     <dbl>    <dbl>     <dbl>
       1     0   1.00000   4.0000   4.00000
       2     0   1.00000   4.0000   4.00000
       3     1 100.59678 101.9111 100.99830
       4     1  98.87146 104.8058  99.20102
       5     0   0.00000   3.0000   2.00000
       6     1  99.38243 102.8664 100.37921
       7     0   0.00000   3.0000   4.00000
       8     1  98.99019 103.4996 101.69110
       9     1  99.33687 102.3849 103.38833
      10     1 100.02103 104.9381 102.07139
      # ... with more rows
      

      Also please not that packages from the driver are not attached automatically so you have to do it manually or reference library functions using fully qualified names.