I am trying to run spark_apply from the sparklyr package to perform kmeans clustering on a bunch of data hosted on a spark cluster. But I am receiving a spark error that I am having difficulty understanding. The data is as follows where the features column is an aggregated vector combining the latitude and longitude columns but not used in this case.
> samplog1
# Source: table<sparklyr_tmp_64d4941e1a2> [?? x 6]
# Database: spark_connection
id timestamp hr latitude longitude features
<chr> <chr> <int> <dbl> <dbl> <list>
1 fffc68e3-866e-4be5-b1bc-5d21b89622ae 2017-10-30 04:29:59 4 1.373545 104.1265 <dbl [2]>
2 fffc7412-deb1-4587-9c22-29ca833865ed 2017-10-30 02:49:47 2 5.701320 117.4892 <dbl [2]>
3 fffd16d5-83f1-4ea1-95de-34b1fcad392b 2017-10-30 04:25:44 4 5.334012 100.2172 <dbl [2]>
4 fffc68e3-866e-4be5-b1bc-5d21b89622ae 2017-10-30 04:29:44 4 1.373545 104.1265 <dbl [2]>
5 fffd16d5-83f1-4ea1-95de-34b1fcad392b 2017-10-30 02:58:30 2 5.334061 100.2173 <dbl [2]>
6 fffd16d5-83f1-4ea1-95de-34b1fcad392b 2017-10-30 04:55:41 4 5.334012 100.2172 <dbl [2]>
7 fffc7412-deb1-4587-9c22-29ca833865ed 2017-10-30 04:49:07 4 5.729879 117.5787 <dbl [2]>
8 fffc68e3-866e-4be5-b1bc-5d21b89622ae 2017-10-30 05:02:08 5 1.373545 104.1265 <dbl [2]>
9 fffc7412-deb1-4587-9c22-29ca833865ed 2017-10-30 00:53:12 0 5.701320 117.4892 <dbl [2]>
10 fffc7412-deb1-4587-9c22-29ca833865ed 2017-10-30 04:08:12 4 5.670300 117.4990 <dbl [2]>
# ... with more rows
The R code is as follows:
kms <- function(idLogs){
#idLogs <- sparklyr::ft_vector_assembler(idLogs, input_cols= c("latitude", "longitude"), output_col = "features")
km <- sparklyr::ml_kmeans(x = idLogs, centers = 3,features = c("latitude","longitude"))
km1 <- copy_to(sc, km$centers, overwrite = T)
cluster <- sdf_predict(km)
clustCounts <- cluster %>% group_by(prediction) %>%
tally %>%
clustCounts <- merge(clustCounts, km$centers, by.x=3, by.y=0)
clustCounts <- clustCounts %>% filter(., conf==max(conf)) %>% select(latitude, longitude, conf)
#clustCounts <- cbind.data.frame(id, hr, clustCounts)
#clustCounts1 <- copy_to(sc, clustCounts, overwrite = T)
}, error = function(e) {
data.frame(string_categories = c(substr(e, 1, 20)))
and called like so
likelyLocs <- spark_apply(samplog, kms)
The error I am receiving in RStudio is:
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 187.0 failed 4 times, most recent failure: Lost task 0.3 in stage 187.0 (TID 250, spark-1.c.halogen-order-184815.internal, executor 2): java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
at sparklyr.Rscript.init(rscript.scala:67)
at sparklyr.WorkerRDD$$anon$2.run(rdd.scala:92)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1457)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1445)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1444)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1444)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1668)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1627)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1616)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1862)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1875)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1888)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke$.invoke(invoke.scala:102)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
at sparklyr.StreamHandler$.read(stream.scala:62)
at sparklyr.BackendHandler.channelRead0(handler.scala:52)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
at sparklyr.Rscript.init(rscript.scala:67)
at sparklyr.WorkerRDD$$anon$2.run(rdd.scala:92)
As directed in the error details, I checked the spark log and got the following.
> spark_log(sc)
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned accumulator 402
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_237_piece0 on in memory (size: 250.0 B, free: 530.0 MB)
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_237_piece0 on spark-1.c.halogen-order-184815.internal:35671 in memory (size: 250.0 B, free: 530.2 MB)
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned accumulator 401
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_236_piece0 on in memory (size: 1658.0 B, free: 530.0 MB)
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_236_piece0 on spark-1.c.halogen-order-184815.internal:35671 in memory (size: 1658.0 B, free: 530.2 MB)
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned accumulator 400
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_235_piece0 on in memory (size: 9.4 KB, free: 530.0 MB)
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_235_piece0 on spark-1.c.halogen-order-184815.internal:35671 in memory (size: 9.4 KB, free: 530.2 MB)
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned accumulator 399
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned shuffle 31
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_234_piece0 on in memory (size: 202.0 B, free: 530.0 MB)
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned accumulator 398
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_233_piece0 on in memory (size: 1550.0 B, free: 530.0 MB)
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_233_piece0 on spark-1.c.halogen-order-184815.internal:35671 in memory (size: 1550.0 B, free: 530.2 MB)
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned accumulator 397
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_232_piece0 on in memory (size: 9.3 KB, free: 530.0 MB)
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_232_piece0 on spark-1.c.halogen-order-184815.internal:35671 in memory (size: 9.3 KB, free: 530.2 MB)
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned accumulator 396
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned shuffle 30
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_231_piece0 on in memory (size: 421.0 B, free: 530.0 MB)
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_230_piece0 on in memory (size: 9.5 KB, free: 530.0 MB)
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_230_piece0 on spark-1.c.halogen-order-184815.internal:35671 in memory (size: 9.5 KB, free: 530.2 MB)
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned accumulator 395
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_229_piece0 on in memory (size: 9.4 KB, free: 530.0 MB)
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_229_piece0 on spark-1.c.halogen-order-184815.internal:35671 in memory (size: 9.4 KB, free: 530.3 MB)
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned accumulator 394
17/11/09 21:48:05 INFO storage.BlockManager: Removing RDD 515
17/11/09 21:48:05 INFO spark.ContextCleaner: Cleaned RDD 515
17/11/09 21:48:05 INFO storage.BlockManagerInfo: Removed broadcast_228_piece0 on in memory (size: 175.0 B, free: 530.0 MB)
17/11/13 12:11:09 INFO spark.SparkContext: Starting job: collect at utils.scala:196
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Got job 153 (collect at utils.scala:196) with 1 output partitions
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Final stage: ResultStage 185 (collect at utils.scala:196)
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Missing parents: List()
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Submitting ResultStage 185 (MapPartitionsRDD[536] at collect at utils.scala:196), which has no missing parents
17/11/13 12:11:09 INFO storage.MemoryStore: Block broadcast_241 stored as values in memory (estimated size 1968.0 B, free 530.0 MB)
17/11/13 12:11:09 INFO storage.MemoryStore: Block broadcast_241_piece0 stored as bytes in memory (estimated size 1206.0 B, free 530.0 MB)
17/11/13 12:11:09 INFO storage.BlockManagerInfo: Added broadcast_241_piece0 in memory on (size: 1206.0 B, free: 530.0 MB)
17/11/13 12:11:09 INFO spark.SparkContext: Created broadcast 241 from broadcast at DAGScheduler.scala:1004
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 185 (MapPartitionsRDD[536] at collect at utils.scala:196) (first 15 tasks are for partitions Vector(0))
17/11/13 12:11:09 INFO cluster.YarnScheduler: Adding task set 185.0 with 1 tasks
17/11/13 12:11:09 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 185.0 (TID 245, spark-1.c.halogen-order-184815.internal, executor 2, partition 0, PROCESS_LOCAL, 3699 bytes)
17/11/13 12:11:09 INFO storage.BlockManagerInfo: Added broadcast_241_piece0 in memory on spark-1.c.halogen-order-184815.internal:35671 (size: 1206.0 B, free: 530.3 MB)
17/11/13 12:11:09 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 185.0 (TID 245) in 94 ms on spark-1.c.halogen-order-184815.internal (executor 2) (1/1)
17/11/13 12:11:09 INFO scheduler.DAGScheduler: ResultStage 185 (collect at utils.scala:196) finished in 0.096 s
17/11/13 12:11:09 INFO cluster.YarnScheduler: Removed TaskSet 185.0, whose tasks have all completed, from pool
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Job 153 finished: collect at utils.scala:196, took 0.111329 s
17/11/13 12:11:09 INFO spark.SparkContext: Starting job: collect at utils.scala:196
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Got job 154 (collect at utils.scala:196) with 1 output partitions
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Final stage: ResultStage 186 (collect at utils.scala:196)
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Missing parents: List()
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Submitting ResultStage 186 (MapPartitionsRDD[538] at collect at utils.scala:196), which has no missing parents
17/11/13 12:11:09 INFO storage.MemoryStore: Block broadcast_242 stored as values in memory (estimated size 1968.0 B, free 530.0 MB)
17/11/13 12:11:09 INFO storage.MemoryStore: Block broadcast_242_piece0 stored as bytes in memory (estimated size 1207.0 B, free 530.0 MB)
17/11/13 12:11:09 INFO storage.BlockManagerInfo: Added broadcast_242_piece0 in memory on (size: 1207.0 B, free: 530.0 MB)
17/11/13 12:11:09 INFO spark.SparkContext: Created broadcast 242 from broadcast at DAGScheduler.scala:1004
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 186 (MapPartitionsRDD[538] at collect at utils.scala:196) (first 15 tasks are for partitions Vector(0))
17/11/13 12:11:09 INFO cluster.YarnScheduler: Adding task set 186.0 with 1 tasks
17/11/13 12:11:09 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 186.0 (TID 246, spark-1.c.halogen-order-184815.internal, executor 2, partition 0, PROCESS_LOCAL, 3699 bytes)
17/11/13 12:11:09 INFO storage.BlockManagerInfo: Added broadcast_242_piece0 in memory on spark-1.c.halogen-order-184815.internal:35671 (size: 1207.0 B, free: 530.3 MB)
17/11/13 12:11:09 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 186.0 (TID 246) in 22 ms on spark-1.c.halogen-order-184815.internal (executor 2) (1/1)
17/11/13 12:11:09 INFO scheduler.DAGScheduler: ResultStage 186 (collect at utils.scala:196) finished in 0.022 s
17/11/13 12:11:09 INFO cluster.YarnScheduler: Removed TaskSet 186.0, whose tasks have all completed, from pool
17/11/13 12:11:09 INFO scheduler.DAGScheduler: Job 154 finished: collect at utils.scala:196, took 0.031006 s
17/11/13 12:11:22 INFO spark.SparkContext: Starting job: take at NativeMethodAccessorImpl.java:-2
17/11/13 12:11:22 INFO scheduler.DAGScheduler: Got job 155 (take at NativeMethodAccessorImpl.java:-2) with 1 output partitions
17/11/13 12:11:22 INFO scheduler.DAGScheduler: Final stage: ResultStage 187 (take at NativeMethodAccessorImpl.java:-2)
17/11/13 12:11:22 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/11/13 12:11:22 INFO scheduler.DAGScheduler: Missing parents: List()
17/11/13 12:11:22 INFO scheduler.DAGScheduler: Submitting ResultStage 187 (WorkerRDD[542] at RDD at rdd.scala:7), which has no missing parents
17/11/13 12:11:22 INFO storage.MemoryStore: Block broadcast_243 stored as values in memory (estimated size 35.2 KB, free 530.0 MB)
17/11/13 12:11:22 INFO storage.MemoryStore: Block broadcast_243_piece0 stored as bytes in memory (estimated size 14.4 KB, free 530.0 MB)
17/11/13 12:11:22 INFO storage.BlockManagerInfo: Added broadcast_243_piece0 in memory on (size: 14.4 KB, free: 530.0 MB)
17/11/13 12:11:22 INFO spark.SparkContext: Created broadcast 243 from broadcast at DAGScheduler.scala:1004
17/11/13 12:11:22 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 187 (WorkerRDD[542] at RDD at rdd.scala:7) (first 15 tasks are for partitions Vector(0))
17/11/13 12:11:22 INFO cluster.YarnScheduler: Adding task set 187.0 with 1 tasks
17/11/13 12:11:22 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 187.0 (TID 247, spark-1.c.halogen-order-184815.internal, executor 2, partition 0, PROCESS_LOCAL, 3488 bytes)
17/11/13 12:11:22 INFO storage.BlockManagerInfo: Added broadcast_243_piece0 in memory on spark-1.c.halogen-order-184815.internal:35671 (size: 14.4 KB, free: 530.2 MB)
17/11/13 12:11:23 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 187.0 (TID 247, spark-1.c.halogen-order-184815.internal, executor 2): java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
at sparklyr.Rscript.init(rscript.scala:67)
at sparklyr.WorkerRDD$$anon$2.run(rdd.scala:92)
17/11/13 12:11:23 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 187.0 (TID 248, spark-1.c.halogen-order-184815.internal, executor 2, partition 0, PROCESS_LOCAL, 3488 bytes)
17/11/13 12:11:24 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 187.0 (TID 248) on spark-1.c.halogen-order-184815.internal, executor 2: java.lang.Exception (sparklyr worker rscript failure with status 255, check worker logs for details.) [duplicate 1]
17/11/13 12:11:24 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 187.0 (TID 249, spark-1.c.halogen-order-184815.internal, executor 2, partition 0, PROCESS_LOCAL, 3488 bytes)
17/11/13 12:11:25 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 187.0 (TID 249) on spark-1.c.halogen-order-184815.internal, executor 2: java.lang.Exception (sparklyr worker rscript failure with status 255, check worker logs for details.) [duplicate 2]
17/11/13 12:11:25 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 187.0 (TID 250, spark-1.c.halogen-order-184815.internal, executor 2, partition 0, PROCESS_LOCAL, 3488 bytes)
17/11/13 12:11:25 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 187.0 (TID 250) on spark-1.c.halogen-order-184815.internal, executor 2: java.lang.Exception (sparklyr worker rscript failure with status 255, check worker logs for details.) [duplicate 3]
17/11/13 12:11:25 ERROR scheduler.TaskSetManager: Task 0 in stage 187.0 failed 4 times; aborting job
17/11/13 12:11:25 INFO cluster.YarnScheduler: Removed TaskSet 187.0, whose tasks have all completed, from pool
17/11/13 12:11:25 INFO cluster.YarnScheduler: Cancelling stage 187
17/11/13 12:11:25 INFO scheduler.DAGScheduler: ResultStage 187 (take at NativeMethodAccessorImpl.java:-2) failed in 3.496 s due to Job aborted due to stage failure: Task 0 in stage 187.0 failed 4 times, most recent failure: Lost task 0.3 in stage 187.0 (TID 250, spark-1.c.halogen-order-184815.internal, executor 2): java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
at sparklyr.Rscript.init(rscript.scala:67)
at sparklyr.WorkerRDD$$anon$2.run(rdd.scala:92)
Driver stacktrace:
17/11/13 12:11:25 INFO scheduler.DAGScheduler: Job 155 failed: take at NativeMethodAccessorImpl.java:-2, took 3.506663 s
17/11/13 12:11:25 ERROR sparklyr: Gateway (37351) failed calling take on 699
All I can seem to fathom is that the spark job is failing somewhere in the last stage so probably merging the output from the different workers? Can anyone help find what may be the problem?
Solved with assistance on the package github issues page here: https://github.com/rstudio/sparklyr/issues/1121
The relevant part:
Still not sure why but adding
did it - idea came from Can sparklyr be used with spark deployed on yarn-managed hadoop cluster?.