Search code examples
scalagradleserializationapache-flinkkryo

Apache Flink Kryo serializer - ClassNotFoundException


I have a project in Apache Flink 1.8.1, with Scala 2.11 and Java 8. I used to use Maven for compiling and all the dependency management, but switched to Gradle... which leads me to this problem below:

j.l.ClassNotFoundException: om.tinker.my.project.ProjectPayload
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    ... 3 frames excluded
    at c.e.k.u.DefaultClassResolver.readName(DefaultClassResolver.java:172)
    ... 15 common frames omitted
    Wrapped by: c.e.kryo.KryoException: Unable to find class: om.tinker.my.project.ProjectPayload
    Serialization trace:
        eventOutputTag (com.my.project.contexts.ProjectContext)
        at c.e.k.u.DefaultClassResolver.readName(DefaultClassResolver.java:178)
        at c.e.k.u.DefaultClassResolver.readClass(DefaultClassResolver.java:147)
        at c.e.kryo.Kryo.readClass(Kryo.java:674)
        at c.e.k.s.ReflectField.read(ReflectField.java:107)
        at c.e.k.s.FieldSerializer.read(FieldSerializer.java:122)
        at c.e.kryo.Kryo.readClassAndObject(Kryo.java:793)
        at o.a.f.a.j.t.r.k.KryoSerializer.deserialize(KryoSerializer.java:346)
        at o.a.f.s.r.s.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
        at o.a.f.s.r.s.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
        at o.a.f.r.p.NonReusingDeserializationDelegate.read(NonReusin...

First, the error message has a missing 'c'. The class path should be 'com.tinker.my.project.ProjectPayload'... I checked the files using that code and there's no missing 'c' in my import statements...

I also edit the Flink conf file to use a parent-first strategy...

Further background info: I have another file called ProjectContext which has an ArrayList<ProjectPayload>. It also has the eventOutputTag (as mentioned in the serialization trace)... When i comment out ArrayList<ProjectPayload> and its getters/setters, EVERYTHING WORKS!

When i put back the instance variable and its getters/setters in ProjectContext, then ClassNotFoundException occurs...

Furthermore, i sprinkled tons of print statements, and i was able to create an instance of ProjectPayload, and log it out fine.

### Edit (June, 30, 2020) ###

In light of this serialization issue, i added this code: env.getConfig.registerTypeWithKryoSerializer(classOf[ProjectPayload], classOf[JavaSerializer[ProjectPayload]])

and now i have this awkward (but similar) error:

"j.l.ClassNotFoundException: \u0005sr\u00008com.tinker.my.project.ProjectPayload+\"v
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    ... 3 frames excluded
    at c.e.k.u.DefaultClassResolver.readName(DefaultClassResolver.java:172)
    ... 15 common frames omitted
    Wrapped by: c.e.kryo.KryoException: Unable to find class: \u0005sr\u00008com.tinker.my.project.ProjectPayload+\"v
    Serialization trace:
    allMyPayloads (com.tinker.my.project.ProjectContext)
    at c.e.k.u.DefaultClassResolver.readName(DefaultClassResolver.java:178)
    at c.e.k.u.DefaultClassResolver.readClass(DefaultClassResolver.java:147)
    at c.e.kryo.Kryo.readClass(Kryo.java:674)
    at c.e.k.s.ReflectField.read(ReflectField.java:107)
    at c.e.k.s.FieldSerializer.read(FieldSerializer.java:122)
    at c.e.kryo.Kryo.readClassAndObject(Kryo.java:793)
    at o.a.f.a.j.t.r.k.KryoSerializer.deserialize(KryoSerializer.java:346)
    at o.a.f.s.r.s.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at o.a.f.s.r.s.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at o.a.f.r.p.NonReusingDeserializationDelegate....
    

Turns out \u0005 is the unicode character 'ENQUIRY'. and \u00008 leads to gibberish on Google search results... will report back later

### Edit (July 1, 2020) ### Some progress: I was initializing the ArrayList<ProjectPayload> inside the ProjectContext. When i removed that initialization, moved it outside, and then set the ArrayList value, my code got much further along. Then it complained about a HashMap<String, String> instance variable as well -- i ended up deleting it since it wasn't used.

Which now brings me to an IndexOutOfBoundsException:

j.l.IndexOutOfBoundsException: Index: 93, Size: 9
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at c.e.k.u.MapReferenceResolver.getReadObject(MapReferenceResolver.java:62)
    at c.e.kryo.Kryo.readReferenceOrNull(Kryo.java:838)
    at c.e.kryo.Kryo.readObjectOrNull(Kryo.java:761)
    at c.e.k.s.ReflectField.read(ReflectField.java:120)
    ... 12 common frames omitted
    Wrapped by: c.e.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 93, Size: 9
    Serialization trace:
        fooBarStr (com.tinker.my.project.contexts.ProjectContext)
        at c.e.k.s.ReflectField.read(ReflectField.java:133)
        at c.e.k.s.FieldSerializer.read(FieldSerializer.java:122)
        at c.e.kryo.Kryo.readClassAndObject(Kryo.java:793)
        at o.a.f.a.j.t.r.k.KryoSerializer.deserialize(KryoSerializer.java:346)
        at o.a.f.s.r.s.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
        at o.a.f.s.r.s.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
        at o.a.f.r.p.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at o.a.f.r.i.n.a.s.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRec...

and this Github issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/456


Solution


  • Try this:

    env.getConfig.registerTypeWithKryoSerializer(classOf[ProjectPayload], classOf[JavaSerializer[ProjectPayload]])
    env.getConfig.registerTypeWithKryoSerializer(classOf[ProjectContext], classOf[JavaSerializer[ProjectContext]])
    

    and make sure you are importing org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer

    https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html#issue-with-using-kryos-javaserializer