I am relatively new to R as well as to Spark. I am writing a function to calculate the exponential moving average for a set of data. I am using the sparklyr package to work on the Databricks Spark platform.
I have written a function that works on a normal R dataframe. However, it is failing when applied to a Spark dataframe.
I am currently not interested in the correctness of the values (I am using dummy values - for eg. init = 10 is arbitrary). I'm more interested in getting this working on a Spark dataframe
library(sparklyr)
library(dplyr)
library(stats)
sc <- spark_connect(method = "databricks")
set.seed(21)
#data
x <- rnorm(1e4)
#data in a dataframe
x_df <- data.frame(x)
#data in a Spark dataframe
x_sprk <- copy_to(sc, x_df, name ="x_sql", overwrite = TRUE)
#function to calculate Exponential moving average
ewma_filter <- function (df, ratio = 0.9) {
mutate(df, ema = c(stats::filter(x * ratio, 1 - ratio, "recursive", init = 10)))
}
When I run this function on a R dataframe, it works fine
y_df <- x_df %>% ewma_filter()
Output:
x ema
1 0.6785634656 1.6107071191
2 -0.8519017349 -0.6056408495
3 -0.0362643838 -0.0932020304
4 0.2422350575 0.2086913487
5 -1.0401144499 -0.9152338701
6 1.4521621543 1.2154225519
7 -0.8531140006 -0.6462603453
8 0.4779933902 0.3655680167
9 1.0719294487 1.0012933055
10 -0.4115495580 -0.2702652716
11 2.4152301588 2.1466806157
12 -0.1045401223 0.1205819515
13 -0.1632591646 -0.1348750530
14 -2.1441820131 -1.9432513170
15 0.4672471535 0.2261973065
16 0.9362099384 0.8652086752
17 0.6494043831 0.6709848123
18 2.5609202716 2.3719267257
But when I try it on Spark dataframe, i do not get the intended output:
y_sprk <- x_sprk %>% ewma_filter()
Output:
x ema
1 0.679
2 -0.852
3 -0.0363
4 0.242
5 -1.04
6 1.45
7 -0.853
8 0.478
9 1.07
10 -0.412
# … with more rows
I tried using spark_apply():
y_sprk <- spark_apply(x_sprk, ewma_filter, columns = list(x = "numeric", ema = "numeric"))
I get the below error:
Error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 115.0 failed 4 times, most recent failure: Lost task 0.3 in stage 115.0 (TID 8623, 10.139.64.6, executor 0): java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
at sparklyr.Rscript.init(rscript.scala:106)
at sparklyr.WorkerApply$$anon$2.run(workerapply.scala:116)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2240)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:55)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2828)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3440)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2795)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2795)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3424)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3419)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3419)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2795)
at sparklyr.Utils$.collect(utils.scala:204)
at sparklyr.Utils.collect(utils.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke.invoke(invoke.scala:139)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
at sparklyr.StreamHandler.read(stream.scala:66)
at sparklyr.BackendHandler.channelRead0(handler.scala:51)
at sparklyr.BackendHandler.channelRead0(handler.scala:4)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
at sparklyr.Rscript.init(rscript.scala:106)
at sparklyr.WorkerApply$$anon$2.run(workerapply.scala:116)
I would be grateful if you could help to debug this and get this working on a spark dataframe.
You are very close! spark_apply()
works on each partition of a Spark DataFrame by default, which works well for what you are trying to do. The error message you got doesn't tell you too much - to really see what happened you actually have to look in the logs for the worker nodes, in stdout
. On Databricks you can find this in the Clusters UI under 'Spark UI - Master', and then by drilling into the worker nodes.
For your code, the error message was actually 19/11/10 19:38:25 ERROR sparklyr: RScript (2719) terminated unexpectedly: could not find function "mutate"
It might seem odd that mutate
can't be found, but the way these UDFs work is that an R process is created on the worker nodes, and for the function to work all of the code/libraries need to be available on those nodes as well. Since you are running on Databricks and dplyr
in included in the Databricks Runtime, it is available on all of the worker nodes. You just need to reference the namespace or load the full library:
library(sparklyr)
library(dplyr)
library(stats)
sc <- spark_connect(method = "databricks")
# Create R dataframe
set.seed(21)
x <- rnorm(1e4)
x_df <- data.frame(x)
# Push R dataframe to Spark
x_sprk <- copy_to(sc, x_df, name ="x_sql", overwrite = TRUE)
# Distribute the R code across each partition
spark_apply(x_sprk, function(x) {
# Define moving average function and reference dplyr explicitly
ewma_filter <- function (df, ratio = 0.9) {
dplyr::mutate(df, ema = c(stats::filter(x * ratio, 1 - ratio, "recursive", init = 10)))
}
# Apply it to each partition of the Spark DF
ewma_filter(x)
})
These are the results of this call to spark_apply()
:
# Source: spark<?> [?? x 2]
x ema
<dbl> <dbl>
1 0.793 1.71
2 0.522 0.641
3 1.75 1.64
4 -1.27 -0.981
5 2.20 1.88
6 0.433 0.578
7 -1.57 -1.36
8 -0.935 -0.977
9 0.0635 -0.0406
10 -0.00239 -0.00621
# … with more rows
This is also covered in the R User Guide for Databricks.