I have built my job jar using sbt assembly
to have all dependencies in one jar. When I try to submit my binary to spark-jobserver I am getting akka.pattern.AskTimeoutException
I modified my configuration to be able to submit large jars (I added parsing.max-content-length = 300m
to my configuration) I also increased some of timeouts in configuration but nothing helped.
After I run:
curl --data-binary @matching-ml-assembly-1.0.jar localhost:8090/jars/matching-ml
I am getting:
{
"status": "ERROR",
"result": {
"message": "Ask timed out on [Actor[akka://JobServer/user/binary-manager#1785133213]] after [3000 ms]. Sender[null] sent message of type \"spark.jobserver.StoreBinary\".",
"errorClass": "akka.pattern.AskTimeoutException",
"stack": ["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)", "akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)", "scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)", "scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)", "scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)", "akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:331)", "akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:282)", "akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:286)", "akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:238)", "java.lang.Thread.run(Thread.java:745)"]
}
My configuration:
# Template for a Spark Job Server configuration file
# When deployed these settings are loaded when job server starts
#
# Spark Cluster / Job Server configuration
spark {
# spark.master will be passed to each job's JobContext
master = "local[4]"
# master = "mesos://vm28-hulk-pub:5050"
# master = "yarn-client"
# Default # of CPUs for jobs to use for Spark standalone cluster
job-number-cpus = 4
jobserver {
port = 8090
context-per-jvm = false
# Note: JobFileDAO is deprecated from v0.7.0 because of issues in
# production and will be removed in future, now defaults to H2 file.
jobdao = spark.jobserver.io.JobSqlDAO
filedao {
rootdir = /tmp/spark-jobserver/filedao/data
}
datadao {
# storage directory for files that are uploaded to the server
# via POST/data commands
rootdir = /tmp/spark-jobserver/upload
}
sqldao {
# Slick database driver, full classpath
slick-driver = slick.driver.H2Driver
# JDBC driver, full classpath
jdbc-driver = org.h2.Driver
# Directory where default H2 driver stores its data. Only needed for H2.
rootdir = /tmp/spark-jobserver/sqldao/data
# Full JDBC URL / init string, along with username and password. Sorry, needs to match above.
# Substitutions may be used to launch job-server, but leave it out here in the default or tests won't pass
jdbc {
url = "jdbc:h2:file:/tmp/spark-jobserver/sqldao/data/h2-db"
user = ""
password = ""
}
# DB connection pool settings
dbcp {
enabled = false
maxactive = 20
maxidle = 10
initialsize = 10
}
}
# When using chunked transfer encoding with scala Stream job results, this is the size of each chunk
result-chunk-size = 1m
}
# Predefined Spark contexts
# contexts {
# my-low-latency-context {
# num-cpu-cores = 1 # Number of cores to allocate. Required.
# memory-per-node = 512m # Executor memory per node, -Xmx style eg 512m, 1G, etc.
# }
# # define additional contexts here
# }
# Universal context configuration. These settings can be overridden, see README.md
context-settings {
num-cpu-cores = 2 # Number of cores to allocate. Required.
memory-per-node = 2G # Executor memory per node, -Xmx style eg 512m, #1G, etc.
# In case spark distribution should be accessed from HDFS (as opposed to being installed on every Mesos slave)
# spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz"
# URIs of Jars to be loaded into the classpath for this context.
# Uris is a string list, or a string separated by commas ','
# dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"]
# Add settings you wish to pass directly to the sparkConf as-is such as Hadoop connection
# settings that don't use the "spark." prefix
passthrough {
#es.nodes = "192.1.1.1"
}
}
# This needs to match SPARK_HOME for cluster SparkContexts to be created successfully
# home = "/home/spark/spark"
}
# Note that you can use this file to define settings not only for job server,
# but for your Spark jobs as well. Spark job configuration merges with this configuration file as defaults.
spray.can.server {
# uncomment the next lines for making this an HTTPS example
# ssl-encryption = on
# path to keystore
#keystore = "/some/path/sjs.jks"
#keystorePW = "changeit"
# see http://docs.oracle.com/javase/7/docs/technotes/guides/security/StandardNames.html#SSLContext for more examples
# typical are either SSL or TLS
encryptionType = "SSL"
keystoreType = "JKS"
# key manager factory provider
provider = "SunX509"
# ssl engine provider protocols
enabledProtocols = ["SSLv3", "TLSv1"]
idle-timeout = 60 s
request-timeout = 20 s
connecting-timeout = 5s
pipelining-limit = 2 # for maximum performance (prevents StopReading / ResumeReading messages to the IOBridge)
# Needed for HTTP/1.0 requests with missing Host headers
default-host-header = "spray.io:8765"
# Increase this in order to upload bigger job jars
parsing.max-content-length = 300m
}
akka {
remote.netty.tcp {
# This controls the maximum message size, including job results, that can be sent
# maximum-frame-size = 10 MiB
}
}
I came to the similar issue. The way how to solve it is a bit tricky. First you need to add spark.jobserver.short-timeout
to your configuration. Just modify your configuration like this:
jobserver {
port = 8090
context-per-jvm = false
short-timeout = 60s
...
}
The second (tricky) part is you can't fix it without modifying code of the spark-job-application. The attribute which cause timeout is in class BinaryManager
:
implicit val daoAskTimeout = Timeout(3 seconds)
The default is set to 3 second which apparently for big jar is not enough. You can increase it to for example 60 second which solve problem for me.
implicit val daoAskTimeout = Timeout(60 seconds)