While working with Spark RDD via sparklyr, I would like to wrap some of the common transformations to pass them more convientntly to mutate
syntax.
For instance, while working with a data with the following timestamps:
2000-01-01 00:00:00.0
2000-02-02 00:00:00.0
I can convert those to a more useful YYYY-MM-dd
format using the syntax:
mutate(nice_date= from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd'))
As I do it frequently, I would like to wrap the from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd'))
call and use syntax:
mutate(nice_date = from_unix_to_nice(bad_date))
Conventional approach would suggest writing a function:
from_unix_to_nice<- function(x) {
from_unixtime(unix_timestamp(x), 'YYYY-MM-dd')
}
When applied the function fails:
> Error: org.apache.spark.sql.AnalysisException: undefined function
> from_unix_to_nice; line 2 pos 62 at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
> at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
> at scala.Option.getOrElse(Option.scala:120)
How can I conveniently develop wrappers for common mutate operations so I can pass those to sparklyr pipeline?
the issue is that the function needs to be passed un-evaluated to the mutate()
function. The rlang
package can be done to accomplish this, here is an example:
library(rlang)
library(sparklyr)
library(nycflights13)
library(dplyr)
sc <- spark_connect(master = "local")
just_time <- flights %>%
select(time_hour) %>%
mutate(time_hour = as.character(time_hour))
head(100)
spark_flights <- copy_to(sc, just_time, "flights")
from_unix_to_nice<- function(x) {
x <- enexpr(x)
expr(from_unixtime(unix_timestamp(!!x), 'YYYY-MM-dd'))
}
from_unix_to_nice(test)
spark_flights %>%
mutate(new_field = !!from_unix_to_nice(time_hour))
The from_unix_to_nice()
function now passes: from_unixtime(unix_timestamp(test), "YYYY-MM-dd")
to mutate()
as if you would have typed that exact syntax.