I have two spark jobs A and B such that A must run before B. The output of A must be readable from:
I am currently using the Java's native serialization with Scala case classes.
From the A Spark Job:
val model = ALSFactorizerModel(...)
context.writeSerializable(resultOutputPath, model)
with serialization method:
def writeSerializable[T <: Serializable](path: String, obj: T): Unit = {
val writer: OutputStream = ... // Google Cloud Storage dependant
val oos: ObjectOutputStream = new ObjectOutputStream(writer)
oos.writeObject(obj)
oos.close()
writer.close()
}
From the B Spark Job or any standalone Non-Spark Scala code:
val lastFactorizerModel: ALSFactorizerModel = context
.readSerializable[ALSFactorizerModel](ALSFactorizer.resultOutputPath)
with de-serialization method:
def readSerializable[T <: Serializable](path: String): T = {
val is : InputStream = ... // Google Cloud Storage dependant
val ois = new ObjectInputStream(is)
val model: T = ois
.readObject()
.asInstanceOf[T]
ois.close()
is.close()
model
}
The (nested) case classes:
ALSFactorizerModel:
package mycompany.algo.als.common.io.model.factorizer
import mycompany.data.item.ItemStore
@SerialVersionUID(1L)
final case class ALSFactorizerModel(
knownItems: Array[ALSFeaturedKnownItem],
unknownItems: Array[ALSFeaturedUnknownItem],
rank: Int,
modelTS: Long,
itemRepositoryTS: Long,
stores: Seq[ItemStore]
) {
}
ItemStore:
package mycompany.data.item
@SerialVersionUID(1L)
final case class ItemStore(
id: String,
tenant: String,
name: String,
index: Int
) {
}
Output:
The exception:
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel.stores of type scala.collection.Seq in instance of mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at mycompany.fs.gcs.SimpleGCSFileSystem.readSerializable(SimpleGCSFileSystem.scala:71)
at mycompany.algo.als.batch.strategy.ALSClusterer$.run(ALSClusterer.scala:38)
at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55)
at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Am I missing something ? Should I configure Dataproc/Spark to support the use of the Java Serialization for this code ?
I submit the job with the --jars <path to my fatjar>
and never had other issues before. The spark dependencies are not included in this Jar, the scope is Provided
.
Scala version: 2.11.8 Spark version: 2.0.2 SBT version: 0.13.13
Thanks for your help
replacing stores: Seq[ItemStore]
by stores: Array[ItemStore]
has solved the problem for us.
Alternatively we could have used another class loader for the ser/deser-ialization operation.
Hope this will help.