Search code examples
scalaapache-sparkspark-jobserver

getting timeout when submitting fat jar to spark-jobserver (akka.pattern.AskTimeoutException)


I have built my job jar using sbt assembly to have all dependencies in one jar. When I try to submit my binary to spark-jobserver I am getting akka.pattern.AskTimeoutException

I modified my configuration to be able to submit large jars (I added parsing.max-content-length = 300m to my configuration) I also increased some of timeouts in configuration but nothing helped.

After I run:

curl --data-binary  @matching-ml-assembly-1.0.jar  localhost:8090/jars/matching-ml

I am getting:

{
  "status": "ERROR",
  "result": {
    "message": "Ask timed out on [Actor[akka://JobServer/user/binary-manager#1785133213]] after [3000 ms]. Sender[null] sent message of type \"spark.jobserver.StoreBinary\".",
    "errorClass": "akka.pattern.AskTimeoutException",
    "stack": ["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)", "akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)", "scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)", "scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)", "scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)", "akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:331)", "akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:282)", "akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:286)", "akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:238)", "java.lang.Thread.run(Thread.java:745)"]
  }

My configuration:

# Template for a Spark Job Server configuration file
# When deployed these settings are loaded when job server starts
#
# Spark Cluster / Job Server configuration
spark {
  # spark.master will be passed to each job's JobContext
  master = "local[4]"
  # master = "mesos://vm28-hulk-pub:5050"
  # master = "yarn-client"

  # Default # of CPUs for jobs to use for Spark standalone cluster
  job-number-cpus = 4

  jobserver {
    port = 8090

    context-per-jvm = false
    # Note: JobFileDAO is deprecated from v0.7.0 because of issues in
    # production and will be removed in future, now defaults to H2 file.
    jobdao = spark.jobserver.io.JobSqlDAO

    filedao {
      rootdir = /tmp/spark-jobserver/filedao/data
    }

    datadao {
      # storage directory for files that are uploaded to the server
      # via POST/data commands
      rootdir = /tmp/spark-jobserver/upload
    }

    sqldao {
      # Slick database driver, full classpath
      slick-driver = slick.driver.H2Driver

      # JDBC driver, full classpath
      jdbc-driver = org.h2.Driver

      # Directory where default H2 driver stores its data. Only needed for H2.
      rootdir = /tmp/spark-jobserver/sqldao/data

      # Full JDBC URL / init string, along with username and password.  Sorry, needs to match above.
      # Substitutions may be used to launch job-server, but leave it out here in the default or tests won't pass
      jdbc {
        url = "jdbc:h2:file:/tmp/spark-jobserver/sqldao/data/h2-db"
        user = ""
        password = ""
      }

      # DB connection pool settings
      dbcp {
        enabled = false
        maxactive = 20
        maxidle = 10
        initialsize = 10
      }
    }
    # When using chunked transfer encoding with scala Stream job results, this is the size of each chunk
    result-chunk-size = 1m
  }

  # Predefined Spark contexts
  # contexts {
  #   my-low-latency-context {
  #     num-cpu-cores = 1           # Number of cores to allocate.  Required.
  #     memory-per-node = 512m         # Executor memory per node, -Xmx style eg 512m, 1G, etc.
  #   }
  #   # define additional contexts here
  # }

  # Universal context configuration.  These settings can be overridden, see README.md
  context-settings {
    num-cpu-cores = 2           # Number of cores to allocate.  Required.
    memory-per-node = 2G         # Executor memory per node, -Xmx style eg 512m, #1G, etc.

    # In case spark distribution should be accessed from HDFS (as opposed to being installed on every Mesos slave)
    # spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz"

    # URIs of Jars to be loaded into the classpath for this context.
    # Uris is a string list, or a string separated by commas ','
    # dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"]

    # Add settings you wish to pass directly to the sparkConf as-is such as Hadoop connection
    # settings that don't use the "spark." prefix
    passthrough {
      #es.nodes = "192.1.1.1"
    }
  }

  # This needs to match SPARK_HOME for cluster SparkContexts to be created successfully
  # home = "/home/spark/spark"
}

# Note that you can use this file to define settings not only for job server,
# but for your Spark jobs as well.  Spark job configuration merges with this configuration file as defaults.
spray.can.server {
  # uncomment the next lines for making this an HTTPS example
  # ssl-encryption = on
  # path to keystore
  #keystore = "/some/path/sjs.jks"
  #keystorePW = "changeit"

  # see http://docs.oracle.com/javase/7/docs/technotes/guides/security/StandardNames.html#SSLContext for more examples
  # typical are either SSL or TLS
  encryptionType = "SSL"
  keystoreType = "JKS"
  # key manager factory provider
  provider = "SunX509"
  # ssl engine provider protocols
  enabledProtocols = ["SSLv3", "TLSv1"]
  idle-timeout = 60 s
  request-timeout = 20 s
  connecting-timeout = 5s
  pipelining-limit = 2 # for maximum performance (prevents StopReading / ResumeReading messages to the IOBridge)
  # Needed for HTTP/1.0 requests with missing Host headers
  default-host-header = "spray.io:8765"

  # Increase this in order to upload bigger job jars
  parsing.max-content-length = 300m
}


akka {
  remote.netty.tcp {
    # This controls the maximum message size, including job results, that can be sent
    # maximum-frame-size = 10 MiB
  }
}

Solution

  • I came to the similar issue. The way how to solve it is a bit tricky. First you need to add spark.jobserver.short-timeout to your configuration. Just modify your configuration like this:

    jobserver {
        port = 8090
    
        context-per-jvm = false
        short-timeout = 60s
        ...
    }
    

    The second (tricky) part is you can't fix it without modifying code of the spark-job-application. The attribute which cause timeout is in class BinaryManager:

    implicit val daoAskTimeout = Timeout(3 seconds)
    

    The default is set to 3 second which apparently for big jar is not enough. You can increase it to for example 60 second which solve problem for me.

     implicit val daoAskTimeout = Timeout(60 seconds)