Search code examples
apache-sparkfailoverself-healing

Apache spark job failed immediately without retry, setting maxFailures doesn't work


I was testing a web crawling/scrapping program on Apache Spark locally on my computer.

the program use a few RDD transformations that takes a volatile function that sporadically fails. (The function's purpose is to transform URL links into web pages, sometimes the headless browser it invoked just blackout or got overloaded - I can't avoid that)

I heard that Apache Spark has powerful failover and retrying feature, any unsuccessful transformation or lost data can be recalculated from scratch from whatever resource it can find (sounds like magic right?) so I didn't put any failover or try-catch in my code.

This is my spark configuration:

val conf = new SparkConf().setAppName("MoreLinkedIn")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","40") //definitely enough

Unfortunately the job failed after the majority of stages and individual tasks succeeded. The latest log in console shows:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:7 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.openqa.selenium.TimeoutException: Timed out after 50 seconds waiting for...

Looks like Spark just give up cowardly after failed once. How do I configure it properly to make it more tenacious?

(my program can be downloaded from https://github.com/tribbloid/spookystuff, sorry for the scarce and disorganized code/documentation, I just start it for a few days)

ADD: if you want to try it yourself, The following code can demonstrate this problem:

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","400000")
val sc = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 8
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
  val x = java.lang.Math.random()
  if (x > 0.9) throw new IllegalStateException("the map has a chance of 10% to fail")
  x
}.reduce(_ + _)
sc.stop()
println("finished")
}

It should be noted that the same IllegalStateException got retried for 32 times in this post: Apache Spark Throws java.lang.IllegalStateException: unread block data


Solution

  • Let me forward the most authoritative answer:

    If this is a useful feature for local mode, we should open a JIRA to document the setting or improve it (I’d prefer to add a spark.local.retries property instead of a special URL format). We initially disabled it for everything except unit tests because 90% of the time an exception in local mode means a problem in the application, and we’d rather let the user debug that right away rather than retrying the task several times and having them worry about why they get so many errors.

    Matei