I am using Kryo serialization in Spark (v1.6.1) in Java and while serializing a class which has a collection in its field, it throws the following error -
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:102)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
... 27 more
I found out that this is because the default CollectionSerializer of Kryo can not deserialize the collection, because its not modifiable and we should use UnmodifiableCollectionsSerializer instead.
How do I mention specifically in spark code to use UnmodifiableCollectionsSerializer for Kryo?
My current configuration is -
SparkConf conf = new SparkConf().setAppName("ABC");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class<?>[] {*list of classes I want to register*});
In case anybody else face this issue, here is the solution - I got it working by using javakaffee kryo serializers.
Add the following maven dependency:
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.42</version>
</dependency>
Write a custom kryo registrator to register UnmodifiableCollectionsSerializer
public class CustomKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
UnmodifiableCollectionsSerializer.registerSerializers(kryo);
}
}
Set spark.kryo.registrator to the custom registrator's fully-qualified name
conf.set("spark.kryo.registrator", "com.abc.CustomKryoRegistrator");
References -