Search code examples
apache-flink

Flink Collector Issue While ReadValue from FasterXML


I have Kafka values as String, and POJO as below,

{"name":"John","timeStamp":"2020-08-11T13:31:31"}
class Person{
    
    private String name;    

    private LocalDateTime timeStamp; 
}

this Time Stamp comes as String from Kafka, and converting them into LocalDateTime.

When i run the program as Standalone and objectMapper.readValue(value, Person.class) using required library from FasterXML, it works fine. It's converting.

When I read from Flink Framework with the below,

 stream.flatMap(new FlatMapFunction<String, Person>() {
            public void flatMap(String value, Collector<Person> out) {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).print();
        env.execute();

I'm getting the below issue,

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: com.fasterxml.jackson.databind.json.JsonMapper@1b7cc17c is not serializable. The object probably contains or references non serializable fields.

The message shows me the Person object is not serializable, and i have implemented Serializable for Person class but no luck. And also, tried below, not luck too.

@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime timeStamp; 

UPDATE:

Looks like issue with API, i read in the below link,

https://issues.apache.org/jira/browse/FLINK-12113


Solution

  • The exception states that the JsonMapper instance is not Serializable - if I'm not mistaken, it has been made serializable as of version 2.1. Also, Person class should be made serializable as well.

    So, in your case I would say you should either switch to jackson-databind version >=2.1 or probably make JsonMapper static field.

    In case of Person class, just simply implement Serializable interface:

    class Person implements Serializable {
        
        private String name;    
    
        private LocalDateTime timeStamp; 
    }