Search code examples
scalaserializationapache-sparkspark-streamingkryo

Kryo: deserialize old version of class


I need to modify a class by adding two new parameters. This class is serialized with Kryo. I'm currently persisting the information related to this class, among other things, as an RDD, every time I stop my stream. When I restart the stream I load the information i previously persisted and use them to have consistency between when I stopped and when I restart.

Since the class I persist needs these new parameters, I changed the class and the serializer, by adding the new kryo.writeObject(output, object, ObjectSerializer) and the kryo.readObject(input, classOf[Object], ObjectSerializer) for the new parameters.

Now, whenever I restart my stream I obtain an exception: " Encountered unregistered class ...".

That seems obvious, because I try to deserialize an object which is not contained in the information I persisted when I stopped the stream. If I delete those data and start the stream as if it didn't have any previous run, then the exception does not occur.

Is there a way to avoid this exception? Maybe by specifying some default values in case those parameters are missing?

Thank You

EDIT:

I found something useful I didn't see before: Kryo issue 194.

This guy has implemented versioning by simply inserting a long defining which version of the deserializer he should be using. It's a simple solution but, since the company that wrote the code I'm working on didn't think about forward compatibility, I guess I'll have to throw out the window all the data that was persisted before the new serializer.

Please, let me know if you anyone could suggest a better solution.

EDIT 2:

Still having issues with this situation. I tried to use the CompatibleFieldSerializer as described here :CompatibleFieldSerializer Example So by registering this serializer and not the custom one that was previously used. The result is that now, when reloading persisted data, it gives a java.lang.NullPointerException. Still no issues if no previous data where persisted. I can start my stream, serialize the new data, stop the stream, deserialize and restart my stream. Still no clue on the resolution.


Solution

  • The solution to the issue was found a couple of months ago. So I thought to post an answer to this question as soon as I could. The problem resided in the fact that, due to a mistake in the code, the class was serialized with a standard Kryo FieldSerializer, which is not forward compatible. We had to perform the following actions to deserialize the old class and convert it into a new serialized class.

    The situation was:

    case class ClassA(field1 : Long, field2 : String)
    

    It was serialized like this:

    object ClassASerializer extends Serializer[ClassA] with Serializable{
      override def write(kryo: Kryo, output: Output, t: ClassA) = {
          output.writeLong    { t.field1 }
          output.writeString  { t.field2 }
     }
      override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
           classA( 
               field1 = input.readLong(),
               field2 = input.readLong()
           )
    

    And a Seq containing the classes to be serialized with a serializer was looped in order to register all serializers for all classes.

        protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
        final def register(kryo: Kryo) = {
             registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
        }
    

    The class needed to be modified by adding a new field, which is the instance of another case class.

    In order to perform such change we had to use an annotation pertaining to the Kryo library "optional",

    ...
    import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional
    import scala.annotation.meta.field
    ...
    
    case class ClassA(field1 : Long, field2 : String,  @(Optional @field)("field3") field3 : ClassB)
    

    The serializer was modified such as when reading the old serialized class it could instantiate field3 with a default value and, when writing, write such default value :

    object ClassASerializer extends Serializer[ClassA] with Serializable{
      override def write(kryo: Kryo, output: Output, t: ClassA) = {
          output.writeLong    { t.field1 }
          output.writeString  { t.field2 }
          kryo.writeObject(output, Option { t.field3 } getOrElse ClassB.default, ClassBSerializer)
    
     }
      override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
           ClassA( 
               field1 = input.readLong(),
               field2 = input.readLong(),
               field3 = ClassB.default
           )
    

    The kryo serializers registration was also modified to register also the optional field:

        protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
        def optionals = Seq("field3")
    
        final def register(kryo: Kryo) = {
            optionals.foreach { optional =>
            kryo.getContext.asInstanceOf[ObjectMap[Any, Any]].put(optional, true) }
            registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
        }
    

    As a result we were able to write the new version of the serialized class. After that, we had to remove the optional annotation, modify the serializer in order to read the real field from the new serialized class, and remove the optional serializer registration and add it to the registry Seq.

    In the meantime we corrected the mistake in the code that forced the serialization via FieldSerializer, but this is not in the scope of the question.