Search code examples
scalacassandraapache-sparkspark-cassandra-connector

Why does using custom case class in Spark shell lead to serialization error?


For the life of me I can't understand why this is not serializable. I'm running below in spark-shell (paste mode). I'm running on Spark 1.3.1, Cassandra 2.1.6, Scala 2.10

import org.apache.spark._
import com.datastax.spark.connector._

val driverPort = 7077
val driverHost = "localhost"
val conf = new SparkConf(true)
  .set("spark.driver.port", driverPort.toString)
  .set("spark.driver.host", driverHost)
  .set("spark.logConf", "true")
  .set("spark.driver.allowMultipleContexts", "true")
  .set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext("local[*]", "test", conf)
case class Test(id: String, creationdate: String) extends Serializable

sc.parallelize(Seq(Test("98429740-2933-11e5-8e68-f7cca436f8bf", "2015-07-13T07:48:47.924Z")))
  .saveToCassandra("testks", "test", SomeColumns("id", "creationdate"))

sc.cassandraTable[Test]("testks", "test").toArray
sc.stop()

I started spark-shell with this:

./spark-shell -Ddriver-class-path=/usr/local/spark/libs/* -Dsun.io.serialization.extendedDebugInfo=true

Didn't see any difference in the inclusion of -Dsun.io.serialization.extendedDebugInfo=true property.

Full error (edited):

java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
        at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150)
        at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
        at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
        at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:149)
        at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:464)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:232)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:227)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:296)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:294)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:294)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:294)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:294)
        at org.apache.spark.scheduler.local.LocalActor.reviveOffers(LocalBackend.scala:81)
        at org.apache.spark.scheduler.local.LocalActor$$anonfun$receiveWithLogging$1.applyOrElse(LocalBackend.scala:63)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
        at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.spark.scheduler.local.LocalActor.aroundReceive(LocalBackend.scala:45)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
        at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
        at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
        ... 45 more
15/08/31 09:04:45 ERROR scheduler.TaskSchedulerImpl: Resource offer failed, task set TaskSet_0 was not serializable

Something different from the worker logs:

org.apache.spark.SparkContext.runJob(SparkContext.scala:1505)
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:38)
$line15.$read$$iwC$$iwC.<init>(<console>:30)
$line15.$read$$iwC.<init>(<console>:51)
$line15.$read.<init>(<console>:53)
$line15.$read$.<init>(<console>:57)
$line15.$read$.<clinit>(<console>)
$line15.$eval$.<init>(<console>:7)
$line15.$eval$.<clinit>(<console>)
$line15.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:824)

Solution

  • I'm pretty sure the reason for the error is that Scala REPL wraps all expressions into an object before compilation and eval phases (see Is it reasonable to use Scala's REPL for comparative performance benchmarks?). While wrapping an expression, it grabs all objects from the environment in which many could be non-serializable vals that can't safely be sent out to the driver (a remote process).

    A solution is to define the case class outside Spark shell and use --jars for both to include on CLASSPATH.