Search code examples
apache-flink

List<T> member of POJO Flink type


I have a type in a Flink job with the following POJO definition

public class Allocation {
    private long count;
    private String name;
    private long core;
    private List<Long> ids;

    // constructor, getters and setters omitted for brevity
}

This is failing to be collected in an operator, due to java.lang.AssertionError: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. - I want to force the POJO serialisation and not fallback to Kryo for performance reasons

I understand I need specify some kind of TypeInformation to hint what the serializer should do, however it's unclear to me whether the contained type of the TypeInformation can just be the class itself, or I must add all members with their types to this object. And then where do I register this TypeInformation object with the system, given that I have several operators into which this type ingresses and egresses (ie via Collector<Allocation>::collect)


Solution

  • From Flink 1.14 onward you can handle this like this:

    public class Allocation {
      ...
      @TypeInfo(ListInfoFactory.class)
      private List<Long> ids;
      ...
    }
    
    public static class ListInfoFactory<E> extends TypeInfoFactory<List<E>> {
      @Override
      public TypeInformation<List<E>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
        TypeInformation<?> elementType = genericParameters.get("E");
        if (elementType == null) {
          throw new InvalidTypesException("Type extraction is not possible on List (element type unknown).");
        }
    
        return Types.LIST((TypeInformation<E>) elementType);
      }
    }
    

    Prior to 1.14 you would have to provide a TypeInfoFactory for the entire Allocation class, rather than for the specific field that has the issue.