I'm using Kstream to consume kafka message and persist it to my db. The messages belong to different pojo's and currently I'm using object mapper to create the object and then persist them in the DB. I've read that Jsondeserialzerserde can be used but I'm not sure how can I use it to map to different pojo's. Having a custom serde for each pojo doesn't make sense. Please help . Thanks in advance.
here is my code:
public Consumer<KStream<String, String>> process() {
return input ->
inpu.foreach((key, value) -> {
ObjectMapper mapper = new ObjectMapper();
try {
if(value.contains("Teacher"))
{
Teacher teacher= mapper.readValue(value,Teacher.class);
teacherRepository.save(teacher);
}
else if(value.contains("Student"))
{
Student student= mapper.readValue(value,Student.class);
studentRepository.save(student)
}
else if(value.contains("Principal"))
{
Principal principal= mapper.readValue(value,Principal.class);
principalRepository.save(Principal);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}); }
}
Kafka Streams binder in Spring Cloud Stream does not directly provide any mechanism for doing what you are asking. Your consumers' type signature says that you are consuming it as a String
, so the binder can infer that information without you explicitly provide any Serde
. However, if you want to further transform the String
into other types using jackson, you need to do it on your own in your business logic as you have it there already. If you only have a few finite types, I don't see a problem for doing so.