On every slave node through Marathon we run Mesos External Shuffle Service . When we submit spark job via dcos CLI in coarse grained mode without dynamic allocation everything working as expected. But when we submit the same job with dynamic allocation it fails.
16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file:/tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
...
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory)
When I submit job as below everything is working as expected:
./dcos spark run --submit-args="--properties-file coarse-grained.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar"
Run job succeeded. Submission id: driver-20161208185927-0043
cqlsh:sp> select count(*) from product_model_per_alerts_by_date ;
count
-------
476
coarse-grained.conf:
spark.cassandra.connection.host 10.32.0.17
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.executor.cores 1
spark.executor.memory 1g
spark.executor.instances 2
spark.submit.deployMode cluster
spark.cores.max 4
portal.spark.cassandra.app.ProductModelPerNrOfAlerts:
package portal.spark.cassandra.app
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
object ProductModelPerNrOfAlerts {
def main(args: Array[String]): Unit = {
val conf = new SparkConf(true)
.setAppName("cassandraSpark-ProductModelPerNrOfAlerts")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "asset_history", "keyspace" -> "sp"))
.load()
.select("datestamp","product_model","nr_of_alerts")
val dr = df
.groupBy("datestamp","product_model")
.avg("nr_of_alerts")
.toDF("datestamp","product_model","nr_of_alerts")
dr.write
.mode(SaveMode.Overwrite)
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "product_model_per_alerts_by_date", "keyspace" -> "sp"))
.save()
sc.stop()
}
}
Through Marathon we run Mesos External Shuffle Service:
{
"id": "spark-mesos-external-shuffle-service-tt",
"container": {
"type": "DOCKER",
"docker": {
"image": "jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1",
"network": "BRIDGE",
"portMappings": [
{ "hostPort": 7337, "containerPort": 7337, "servicePort": 7337 }
],
"forcePullImage":true,
"volumes": [
{
"containerPath": "/tmp",
"hostPath": "/tmp",
"mode": "RW"
}
]
}
},
"instances": 9,
"cpus": 0.2,
"mem": 512,
"constraints": [["hostname", "UNIQUE"]]
}
Dockerfile for jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1:
FROM mesosphere/spark:1.0.4-2.0.1
WORKDIR /opt/spark/dist
ENTRYPOINT ["./bin/spark-class", "org.apache.spark.deploy.mesos.MesosExternalShuffleService"]
Now when I submit job with dynamic allocation it fails:
./dcos spark run --submit-args="--properties-file dynamic-allocation.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar"
Run job succeeded. Submission id: driver-20161208191958-0047
select count(*) from product_model_per_alerts_by_date ;
count
-------
5
dynamic-allocation.conf:
spark.cassandra.connection.host 10.32.0.17
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.executor.cores 1
spark.executor.memory 1g
spark.submit.deployMode cluster
spark.cores.max 4
spark.shuffle.service.enabled true
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 5
spark.dynamicAllocation.cachedExecutorIdleTimeout 120s
spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 20s
spark.mesos.executor.docker.volumes /tmp:/tmp:rw
spark.local.dir /tmp
logs from mesos:
16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 18.0 KB, free 366.0 MB)
16/12/08 19:20:42 INFO TorrentBroadcast: Reading broadcast variable 7 took 21 ms
16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 38.6 KB, free 366.0 MB)
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.32.0.4:45422)
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Got the output locations
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 58 blocks
16/12/08 19:20:42 INFO TransportClientFactory: Successfully created connection to /10.32.0.11:7337 after 2 ms (0 ms spent in bootstraps)
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 13 ms
16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
...
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory)
logs from marathon spark-mesos-external-shuffle-service-tt:
...
16/12/08 19:20:29 INFO MesosExternalShuffleBlockHandler: Received registration request from app 704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047 (remote address /10.32.0.4:49710, heartbeat timeout 120000 ms).
16/12/08 19:20:31 INFO ExternalShuffleBlockResolver: Registered executor AppExecId{appId=704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047, execId=2} with ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
16/12/08 19:20:38 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8157825166903585542
java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index (No such file or directory)
...
but file exists on given slave box:
$ ls -l /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index
-rw-r--r-- 1 root root 1608 Dec 8 19:20 /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index
stat shuffle_0_55_0.index
File: 'shuffle_0_55_0.index'
Size: 1608 Blocks: 8 IO Block: 4096 regular file
Device: 801h/2049d Inode: 1805493 Links: 1
Access: (0644/-rw-r--r--) Uid: ( 0/ root) Gid: ( 0/ root)
Access: 2016-12-08 19:20:38.163188836 +0000
Modify: 2016-12-08 19:20:38.163188836 +0000
Change: 2016-12-08 19:20:38.163188836 +0000
Birth: -
There was error in marathon external shuffle service config instead of path container.docker.volumes
we should use container.volumes
path.
Correct configuration:
{
"id": "mesos-external-shuffle-service-simple",
"container": {
"type": "DOCKER",
"docker": {
"image": "jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1",
"network": "BRIDGE",
"portMappings": [
{ "hostPort": 7337, "containerPort": 7337, "servicePort": 7337 }
],
"forcePullImage":true
},
"volumes": [
{
"containerPath": "/tmp",
"hostPath": "/tmp",
"mode": "RW"
}
]
},
"instances": 9,
"cpus": 0.2,
"mem": 512,
"constraints": [["hostname", "UNIQUE"]]
}