For the life of me I can't understand why this is not serializable. I'm running below in spark-shell (paste mode). I'm running on Spark 1.3.1, Cassandra 2.1.6, Scala 2.10
import org.apache.spark._
import com.datastax.spark.connector._
val driverPort = 7077
val driverHost = "localhost"
val conf = new SparkConf(true)
.set("spark.driver.port", driverPort.toString)
.set("spark.driver.host", driverHost)
.set("spark.logConf", "true")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext("local[*]", "test", conf)
case class Test(id: String, creationdate: String) extends Serializable
sc.parallelize(Seq(Test("98429740-2933-11e5-8e68-f7cca436f8bf", "2015-07-13T07:48:47.924Z")))
.saveToCassandra("testks", "test", SomeColumns("id", "creationdate"))
sc.cassandraTable[Test]("testks", "test").toArray
sc.stop()
I started spark-shell with this:
./spark-shell -Ddriver-class-path=/usr/local/spark/libs/* -Dsun.io.serialization.extendedDebugInfo=true
Didn't see any difference in the inclusion of -Dsun.io.serialization.extendedDebugInfo=true property.
Full error (edited):
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:149)
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:464)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:232)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:227)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:296)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:294)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:294)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:294)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:294)
at org.apache.spark.scheduler.local.LocalActor.reviveOffers(LocalBackend.scala:81)
at org.apache.spark.scheduler.local.LocalActor$$anonfun$receiveWithLogging$1.applyOrElse(LocalBackend.scala:63)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.local.LocalActor.aroundReceive(LocalBackend.scala:45)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
... 45 more
15/08/31 09:04:45 ERROR scheduler.TaskSchedulerImpl: Resource offer failed, task set TaskSet_0 was not serializable
Something different from the worker logs:
org.apache.spark.SparkContext.runJob(SparkContext.scala:1505)
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:38)
$line15.$read$$iwC$$iwC.<init>(<console>:30)
$line15.$read$$iwC.<init>(<console>:51)
$line15.$read.<init>(<console>:53)
$line15.$read$.<init>(<console>:57)
$line15.$read$.<clinit>(<console>)
$line15.$eval$.<init>(<console>:7)
$line15.$eval$.<clinit>(<console>)
$line15.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:824)
I'm pretty sure the reason for the error is that Scala REPL wraps all expressions into an object before compilation and eval phases (see Is it reasonable to use Scala's REPL for comparative performance benchmarks?). While wrapping an expression, it grabs all objects from the environment in which many could be non-serializable vals that can't safely be sent out to the driver (a remote process).
A solution is to define the case class outside Spark shell and use --jars
for both to include on CLASSPATH.