Search code examples
clojurekryoflambo

Failed to register classes with Kryo


I am creating a spark context by using-

(ns something
  (:require [flambo.conf : conf]
                 [flambo.api :as f]))
(def c (-> (conf/spark-conf)
           (conf/master "spark://formcept008.lan:7077") 
           (conf/app-name "clustering")))  ;; app-name   
(def sc (f/spark-context c))

Then I am creating an RDD-

(f/parallelize sc DATA)

Now when I am performing some action on this data, like (f/take rdd 3) etc, I am getting an error-

17/11/28 14:35:00 ERROR Utils: Exception encountered org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:129) at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:274) at org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:259) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:175) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:79) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1273) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: flambo.kryo.BaseFlamboRegistrator at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:124) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:124) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:124) ... 27 more 17/11/28 14:35:00 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2449) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1385) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Any thoughts on that, please.


Solution

  • Solved. Add all the jars of your project in spark-configuration by using -

    (conf/jars (map #(.getPath % (.getURLs(java.lang.ClassLoader/getSystemClassLoader))))
    

    It will register all the classes. Since this issue is resolved, so closing it.