Search code examples
apache-flink

flink type system- registerType for the subtype


I am reading at https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#most-frequent-issues

It says:

Registering subtypes: If the function signatures describe only the supertypes, but they actually use subtypes of those during execution, it may increase performance a lot to make Flink aware of these subtypes. For that, call .registerType(clazz) on the StreamExecutionEnvironment or ExecutionEnvironment for each subtype.

Is there test case or code example to demonstrate the usage of this scenario?

Thanks.


Solution

  • DataStream<Msg> test = stream.map(x -> (x < 10) ? new Msg1() : new Msg2());
    

    You should register these subtypes:

    env.registerType(Msg1.class);
    env.registerType(Msg2.class);
    

    The PojoSerializer tags each instance with either the subclass' full name, a subclass tag when registered (as above), or serializes the target type directly (no tag). As such, type registration is not needed if you always use the given type directly and never serialize any subclass instances. But when they will be serialized, the registering the subclasses produces tags that are more compact (and more efficient) than the full subclass names.