Search code examples
javahashmapgsonapache-flinkflink-streaming

Apache Flink: How do I use a stream of Java Map (or Map containing DTOs)?


I am using Flink and have a stream of JSON strings arriving in my system with dynamically changing fields and nested fields. So I can't mock and convert this incoming JSON as a static POJO and I have to rely on a Map instead.

My first transformation is to convert the JSON string stream into a Map object stream using GSON parsing and then I wrap the map in a DTO called Data.

(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);

Data data = new Data(map); // Data has getters, setters for the map and implements Serializable

Problem arises when right after this transformation processing, I attempt to feed the resultant stream into my custom Flink sink. The invoke function does not get called in the sink. The sink works however, if I change from this Map containing DTO to a primitive or a regular DTO with no Map.

My DTO looks like this:

public class FakeDTO {
    private String id;
    private LinkedTreeMap map; // com.google.gson.internal

    // getters and setters
    // constructors, empty and with fields

I have tried the two following solutions:

env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class; 
env.getConfig().disableGenericTypes();

Any expert advise I could use in this situation?


Solution

  • I was able to resolve this issue. In my Flink logs I saw that one Kryo file called ReflectionSerializerFactory class was not being found. I updated the Kryo version in maven and use a Map type for my map which Flink documentation says Flink supports.

    Just make sure to have generic types specified in your code and add getters and setters inside your POJOs for Maps.

    I also use the .returns(xyz.class) type decleration to avoid the effects of Type Erasure.