Search code examples
rfunctiondplyrsparkrsparklyr

Making Spark functions accessible from within a bespoke function in mutate


While working with Spark RDD via , I would like to wrap some of the common transformations to pass them more convientntly to mutate syntax.

Example

For instance, while working with a data with the following timestamps:

2000-01-01 00:00:00.0
2000-02-02 00:00:00.0

I can convert those to a more useful YYYY-MM-dd format using the syntax:

mutate(nice_date= from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd'))

Challenge

As I do it frequently, I would like to wrap the from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd')) call and use syntax:

mutate(nice_date = from_unix_to_nice(bad_date))

Conventional approach would suggest writing a function:

from_unix_to_nice<- function(x) {
    from_unixtime(unix_timestamp(x), 'YYYY-MM-dd')
}

Problem

When applied the function fails:

> Error: org.apache.spark.sql.AnalysisException: undefined function
> from_unix_to_nice; line 2 pos 62  at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
>   at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
>   at scala.Option.getOrElse(Option.scala:120)

How can I conveniently develop wrappers for common mutate operations so I can pass those to sparklyr pipeline?


Solution

  • the issue is that the function needs to be passed un-evaluated to the mutate() function. The rlang package can be done to accomplish this, here is an example:

    library(rlang)
    library(sparklyr) 
    library(nycflights13)
    library(dplyr)
    
    sc <- spark_connect(master = "local")
    
    just_time <- flights %>%
         select(time_hour) %>%
         mutate(time_hour = as.character(time_hour))
         head(100)
    
    spark_flights <- copy_to(sc, just_time, "flights")
    
    
    from_unix_to_nice<- function(x) {
      x <- enexpr(x)
      expr(from_unixtime(unix_timestamp(!!x), 'YYYY-MM-dd'))
    }
    
    from_unix_to_nice(test)
    
    
    spark_flights %>%
      mutate(new_field =  !!from_unix_to_nice(time_hour))
    

    The from_unix_to_nice() function now passes: from_unixtime(unix_timestamp(test), "YYYY-MM-dd") to mutate() as if you would have typed that exact syntax.