Search code examples
apache-sparkapache-spark-mllibapache-spark-ml

Spark 1.6.0 executor dies because of ClassCastException and causes timeout


I'm trying to fit a Spark ML pipeline but my executor dies. The project is also on GitHub. Here's the script which doesn't work (a bit simplified):

// Prepare data sets
logInfo("Getting datasets")
val emoTrainingData = sqlc.read.parquet("/tw/sentiment/emo/parsed/data.parquet")
val trainingData = emoTrainingData

// Configure the pipeline
val pipeline = new Pipeline().setStages(Array(
  new FeatureReducer().setInputCol("raw_text").setOutputCol("reduced_text"),
  new StringSanitizer().setInputCol("reduced_text").setOutputCol("text"),
  new Tokenizer().setInputCol("text").setOutputCol("raw_words"),
  new StopWordsRemover().setInputCol("raw_words").setOutputCol("words"),
  new HashingTF().setInputCol("words").setOutputCol("features"),
  new NaiveBayes().setSmoothing(0.5).setFeaturesCol("features"),
  new ColumnDropper().setDropColumns("raw_text", "reduced_text", "text", "raw_words", "words", "features")
))

// Fit the pipeline
logInfo(s"Training model on ${trainingData.count()} rows")
val model = pipeline.fit(trainingData)

It executes up to the last line. It prints "Training model on xx rows", then it starts fitting, the executor dies, the drivers doesn't receive heartbeats from the executor and it times out, then the script exits. It doesn't get past that line.

This is the exception that kills the executor:

java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
  at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
  at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:497)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
  at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
  at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436)
  at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426)
  at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424)
  at scala.collection.Iterator$class.foreach(Iterator.scala:742)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424)
  at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468)
  at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
  at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
  at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
  at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
  at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
  at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
  ... 32 more

Which, later on, causes a timeout:

ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 142918 ms

I uploaded the INFO-level log file here. The DEBUG log is ~500MB.

The build file and dependencies seem to be all right:

name := "tweeather"

version := "1.0.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.0",
  "org.apache.spark" %% "spark-mllib" % "1.6.0",
  "org.apache.spark" %% "spark-streaming" % "1.6.0",
  "org.apache.hadoop" % "hadoop-client" % "2.7.1",
  "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
  "org.twitter4j" % "twitter4j-stream" % "4.0.4",
  "org.scalaj" %% "scalaj-http" % "2.0.0",
  "com.jsuereth" %% "scala-arm" % "1.4",
  "edu.ucar" % "grib" % "4.6.3"
)

dependencyOverrides ++= Set(
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4",
  "org.scala-lang" % "scala-compiler" % scalaVersion.value,
  "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4",
  "org.scala-lang.modules" %% "scala-xml" % "1.0.4",
  "jline" % "jline" % "2.12.1"
)

resolvers ++= Seq(
  "Unidata Releases" at "http://artifacts.unidata.ucar.edu/content/repositories/unidata-releases/"
)

Solution

  • I still don't know what the cause actually was, but I ran the script again with only a third of the input data and it worked. It didn't fail anymore. From my observations, it only crashed if I had more than 10,000 tasks.

    I ended up coalescing my data (in another script) into 99 partitions. After I ran the script again, it computed everything successfully.