What I am trying to achieve:
With a Reactor Kafka Consumer, consume messages, and be able to "do nothing" when there is an error.
The message inside kafka is a JSON representation of Person
, the business flow is to convert it to person
object.
example of message input: {"name":"John", "age":30}
However, the following can happen:
1 - it is a happy case, the json is well formed, I can convert it to Person with objectMapper.
2 - it is a wrong json, JsonProcessingException
, I would like to just throw a runtime exception
3 - the json is incomplete, like {"name":"John", "ag
for this case alone, I would like to ignore this case only. However, I am not able to just ignore the message.
What did I try:
Flux<Person> consume() {
return kafkaReceiver.receive()
.map(oneRecordWithJson -> {
try {
//regular flow, no issue, just transform an input to an output
return objectMapper.readValue(oneRecordWithJson.value(), Person.class);
} catch (JsonEOFException | JsonMappingException e) {
// the json is broken in the first place, I do not want to do anything in this case.
// I do not want to return a default bad object, I do not want to throw any exception
// But here, I am required to return
LOGGER.error("the input is broken, it is not even a valid json to begin with {}", oneRecordWithJson.value(), e);
return null; //<- no!
} catch (JsonProcessingException e) {
// this is bad, I do want to throw an exception here (which will stop the consumer, but that what is expected)
throw new RuntimeException(e);
}
})
.map(oneCorrectPerson -> doSomething(oneCorrectPerson));
}
Question:
How to just ignore the case when the input is not well formed? Without having to throw an exception, returning a null, a default object, etc...
Just use Flux.mapNotNull
:
return kafkaReceiver.receive()
.mapNotNull(oneRecordWithJson -> {
// whatever
return null; // record would be skipped
})