Search code examples
rapache-sparksparkr

spark.lapply and access to SparkDataFrame [SparkR]


I have a problem with accessing SparkDataFrame during spark.lapplyfunction. The code is as follows:

df <- data.frame(x = c(1,2), y = c("a", "b"))

Sys.setenv(SPARK_HOME = "path/spark-2.0.0-bin-hadoop2.7/")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

sparkR.session(master = "spark://host:7077", 
               appName = "SparkR", 
               sparkHome = Sys.getenv("SPARK_HOME"), 
               sparkConfig = list(spark.driver.memory = "2g"), 
               enableHiveSupport = TRUE)

spark_df <- as.DataFrame(df)
fun_to_distribute <- function(i){
    data <- take(spark_df, 1)$x
    return(data + i)
}

spark.lapply(1:2, fun_to_distribute)

sparkR.session.stop()

Unfortunately, I always receive an error:

[Stage 1:>                                                          (0 + 2) / 2]17/04/28 01:57:56 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 173.38.82.173): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
    at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
    at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/04/28 01:57:56 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 173.38.82.175): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
    at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
    at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/04/28 01:57:56 WARN TaskSetManager: Lost task 1.3 in stage 1.0 (TID 6, 173.38.82.175): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
    at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
    at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/04/28 01:57:56 ERROR TaskSetManager: Task 1 in stage 1.0 failed 4 times; aborting job
17/04/28 01:57:56 ERROR RBackendHandler: collect on 19 failed
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 6, 173.38.82.175): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
Error in callJMethod(x@sdf, "limit", as.integer(num)) : 
  Invalid jobj 14. If SparkR was restarted, Spark operations need to be re-executed.
    at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
    at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.s

Of course, I can provide function with the complex argument like list, but I would rather upload the data to spark cluster once and enable each of the executors to access it during the runtime.


Solution

  • Nesting of distributed contexts is not permitted in Apache Spark and you cannot access distributed data structures from within a task.

    Furthermore SparkDataFrame is not intended as for cases where you need single item access, which is seems to be the desired behavior here. If you want to pass arguments to you should do it directly using standard R objects.