Search code examples
scalaapache-sparkfault-tolerance

Why Apache Spark not re-submit failed tasks?


I want to simulate fault-tolerance behavior. I wrote "hard" function, that failed from time to time. for example:

def myMap(v: String) = {
  // print task info and return "Ok" or throw exception
  val context = TaskContext.get()
  val r = scala.util.Random
  val raise = r.nextBoolean()
  println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
  if ( raise )
    throw new RuntimeException("oh ;(")
  "Ok"
}

Because Spark has fault-tolerance ability, I expected that failed tasks will be re-executed automatically, but it does not with the following code:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}

object Example {

  def main(args:Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val conf = new SparkConf()
      .setAppName("shuffle example")
      .setMaster("local[*]")
      .set("spark.task.maxFailures", "4") // it is default value

    val sc = new SparkContext(conf)


    val l:RDD[String] = sc.parallelize(List( "a", "b", "c"), 3)

    def myMap(v: String) = {
      // print task info and return "Ok" or throw exception
      val context = TaskContext.get()
      val r = scala.util.Random
      val raise = r.nextBoolean()
      println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
      if ( raise )
        throw new Exception("oh ;(")
      "Ok"
    }

    println (l.map(myMap).collect().mkString("\n")) // failed 

    sc.stop()
  }
}

What am I doing wrong?


Solution

  • Actually, spark not support fault-tolerance in local mode.

    In example above, if set mater to some master of standalone (or yarn) cluster, make jar-file, and run it via spark-submit, behavior will be as expected: some tasks will be failed, but re-submited. If the application has some singleton (object in Scala), it will be keep own state across failed tasks.