Search code examples

Why Apache Spark not re-submit failed tasks?

I want to simulate fault-tolerance behavior. I wrote "hard" function, that failed from time to time. for example:

def myMap(v: String) = {
  // print task info and return "Ok" or throw exception
  val context = TaskContext.get()
  val r = scala.util.Random
  val raise = r.nextBoolean()
  println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
  if ( raise )
    throw new RuntimeException("oh ;(")

Because Spark has fault-tolerance ability, I expected that failed tasks will be re-executed automatically, but it does not with the following code:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}

object Example {

  def main(args:Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("shuffle example")
      .set("spark.task.maxFailures", "4") // it is default value

    val sc = new SparkContext(conf)

    val l:RDD[String] = sc.parallelize(List( "a", "b", "c"), 3)

    def myMap(v: String) = {
      // print task info and return "Ok" or throw exception
      val context = TaskContext.get()
      val r = scala.util.Random
      val raise = r.nextBoolean()
      println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
      if ( raise )
        throw new Exception("oh ;(")

    println ("\n")) // failed 


What am I doing wrong?


  • Actually, spark not support fault-tolerance in local mode.

    In example above, if set mater to some master of standalone (or yarn) cluster, make jar-file, and run it via spark-submit, behavior will be as expected: some tasks will be failed, but re-submited. If the application has some singleton (object in Scala), it will be keep own state across failed tasks.