I have Kafka values as String, and POJO as below,
{"name":"John","timeStamp":"2020-08-11T13:31:31"}
class Person{
private String name;
private LocalDateTime timeStamp;
}
this Time Stamp comes as String from Kafka, and converting them into LocalDateTime
.
When i run the program as Standalone and objectMapper.readValue(value, Person.class)
using required library from FasterXML, it works fine. It's converting.
When I read from Flink Framework with the below,
stream.flatMap(new FlatMapFunction<String, Person>() {
public void flatMap(String value, Collector<Person> out) {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).print();
env.execute();
I'm getting the below issue,
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: com.fasterxml.jackson.databind.json.JsonMapper@1b7cc17c is not serializable. The object probably contains or references non serializable fields.
The message shows me the Person object is not serializable, and i have implemented Serializable
for Person
class but no luck. And also, tried below, not luck too.
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime timeStamp;
UPDATE:
Looks like issue with API, i read in the below link,
The exception states that the JsonMapper
instance is not Serializable
- if I'm not mistaken, it has been made serializable as of version 2.1
. Also, Person
class should be made serializable as well.
So, in your case I would say you should either switch to jackson-databind
version >=2.1
or probably make JsonMapper static
field.
In case of Person
class, just simply implement Serializable
interface:
class Person implements Serializable {
private String name;
private LocalDateTime timeStamp;
}