I have a type in a Flink job with the following POJO definition
public class Allocation {
private long count;
private String name;
private long core;
private List<Long> ids;
// constructor, getters and setters omitted for brevity
}
This is failing to be collected in an operator, due to java.lang.AssertionError: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
- I want to force the POJO serialisation and not fallback to Kryo for performance reasons
I understand I need specify some kind of TypeInformation
to hint what the serializer should do, however it's unclear to me whether the contained type of the TypeInformation
can just be the class itself, or I must add all members with their types to this object. And then where do I register this TypeInformation
object with the system, given that I have several operators into which this type ingresses and egresses (ie via Collector<Allocation>::collect
)
From Flink 1.14 onward you can handle this like this:
public class Allocation {
...
@TypeInfo(ListInfoFactory.class)
private List<Long> ids;
...
}
public static class ListInfoFactory<E> extends TypeInfoFactory<List<E>> {
@Override
public TypeInformation<List<E>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
TypeInformation<?> elementType = genericParameters.get("E");
if (elementType == null) {
throw new InvalidTypesException("Type extraction is not possible on List (element type unknown).");
}
return Types.LIST((TypeInformation<E>) elementType);
}
}
Prior to 1.14 you would have to provide a TypeInfoFactory for the entire Allocation class, rather than for the specific field that has the issue.