Search code examples
apache-flinkflink-streaming

Flink Serialization: POJO type vs. GenericType


In my Flink application, I use java.time.Instant to represent UTC timestamps. The application is running fine, but I recently noticed this message in the Flink logs:

"Class class java.time.Instant cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on \"Data Types & Serialization\" for details of the effect on performance."

When I went to read the documentation, there is not much discussion on the performance hit of using something like Instant. My general understanding is that Kryo must be used instead of Flink's built in serializers. I am currently using Flink 1.6 and see that Flink 1.7 and above appear to have an InstantSerializer class. Does this mean that if I upgrade Flink versions, that my POJOs that use Instant will no longer need to be processed as a GenericType?

In general, what is the best java class to use to represent time? Is there a way to use Instant and mitigate or eliminate any effect on performance?


Solution

  • The log message is a bit misleading, but your understanding is correct. Instant is serialized using Kryo in Flink 1.6.

    In Flink 1.7+ Instant will be serialized with the InstantSerializer, not with the KryoSerializer.

    Whether your POJO will be treated as such or not, does not depend on how Instant will be serialized in your POJO. The message just says that the system tried to see if Instant is a POJO or not.

    Example:

        public class SpecialMomentWithName {
            private String name;
            public Instant specialMoment;
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
        }
    

    SpecialMomentWithName will always be handled as a POJO in Flink.

    You will probably find a small performance hit when serializing Instant using Kryo vs the new InstanceSerializer in a microbenchmark. Whether the performance of your Flink job benefits from such a change is difficult to predict: If the serialization costs of Instant burn most of your CPU time (and your job is CPU bound), then I would expect a performance improvement. If your network, or hard disk (when using RocksDB) are the limiting factor, I would not expect a performance improvement.

    I would not optimize the performance of the Instance serialization without having done some analysis on where you are actually loosing performance. If you find that your performance is suffering from serializing time like this, you can try to represent the Instance as a long. This will reduce the readability of your code, and you'll potentially have additional CPU cycles for converting between types.