I want to simulate fault-tolerance behavior. I wrote "hard" function, that failed from time to time. for example:
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if ( raise )
throw new RuntimeException("oh ;(")
"Ok"
}
Because Spark has fault-tolerance ability, I expected that failed tasks will be re-executed automatically, but it does not with the following code:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object Example {
def main(args:Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val conf = new SparkConf()
.setAppName("shuffle example")
.setMaster("local[*]")
.set("spark.task.maxFailures", "4") // it is default value
val sc = new SparkContext(conf)
val l:RDD[String] = sc.parallelize(List( "a", "b", "c"), 3)
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if ( raise )
throw new Exception("oh ;(")
"Ok"
}
println (l.map(myMap).collect().mkString("\n")) // failed
sc.stop()
}
}
What am I doing wrong?
Actually, spark not support fault-tolerance in local mode.
In example above, if set mater to some master of standalone (or yarn) cluster, make jar-file, and run it via spark-submit, behavior will be as expected: some tasks will be failed, but re-submited. If the application has some singleton (object in Scala), it will be keep own state across failed tasks.