Search code examples
javascalaapache-sparkserializationgoogle-cloud-dataproc

ClassCastException while deserializing with Java's native readObject from Spark driver


I have two spark jobs A and B such that A must run before B. The output of A must be readable from:

  • The spark job B
  • A standalone Scala program outside of Spark environment (no Spark dependency in)

I am currently using the Java's native serialization with Scala case classes.

From the A Spark Job:

val model = ALSFactorizerModel(...)

context.writeSerializable(resultOutputPath, model)

with serialization method:

def writeSerializable[T <: Serializable](path: String, obj: T): Unit = {
  val writer: OutputStream = ... // Google Cloud Storage dependant
  val oos: ObjectOutputStream = new ObjectOutputStream(writer)
  oos.writeObject(obj)
  oos.close()
  writer.close()
}

From the B Spark Job or any standalone Non-Spark Scala code:

val lastFactorizerModel: ALSFactorizerModel = context
                     .readSerializable[ALSFactorizerModel](ALSFactorizer.resultOutputPath)

with de-serialization method:

def readSerializable[T <: Serializable](path: String): T = {
  val is : InputStream = ... // Google Cloud Storage dependant
  val ois = new ObjectInputStream(is)
  val model: T = ois
    .readObject()
    .asInstanceOf[T]
  ois.close()
  is.close()

  model
}

The (nested) case classes:

ALSFactorizerModel:

package mycompany.algo.als.common.io.model.factorizer

import mycompany.data.item.ItemStore

@SerialVersionUID(1L)
final case class ALSFactorizerModel(
  knownItems:       Array[ALSFeaturedKnownItem],
  unknownItems:     Array[ALSFeaturedUnknownItem],
  rank:             Int,
  modelTS:          Long,
  itemRepositoryTS: Long,
  stores:           Seq[ItemStore]
) {   
}

ItemStore:

package mycompany.data.item

@SerialVersionUID(1L)
final case class ItemStore(
  id:     String,
  tenant: String,
  name:   String,
  index:  Int
) {
}

Output:

  • From a standalone non-Spark Scala program => OK
  • From the B Spark job running locally on my dev machine (Spark standalone local node) => OK
  • From the B Spark job running on a (Dataproc) Spark cluster => Fails with the below exception:

The exception:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel.stores of type scala.collection.Seq in instance of mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel
  at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at mycompany.fs.gcs.SimpleGCSFileSystem.readSerializable(SimpleGCSFileSystem.scala:71)
  at mycompany.algo.als.batch.strategy.ALSClusterer$.run(ALSClusterer.scala:38)
  at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55)
  at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Am I missing something ? Should I configure Dataproc/Spark to support the use of the Java Serialization for this code ?

I submit the job with the --jars <path to my fatjar> and never had other issues before. The spark dependencies are not included in this Jar, the scope is Provided.

Scala version: 2.11.8 Spark version: 2.0.2 SBT version: 0.13.13

Thanks for your help


Solution

  • replacing stores: Seq[ItemStore] by stores: Array[ItemStore] has solved the problem for us.

    Alternatively we could have used another class loader for the ser/deser-ialization operation.

    Hope this will help.