Search code examples
javajsonreactor-kafka

How to do nothing when consuming invalid json using reactor kafka?


What I am trying to achieve:

With a Reactor Kafka Consumer, consume messages, and be able to "do nothing" when there is an error.

The message inside kafka is a JSON representation of Person, the business flow is to convert it to person object.

example of message input: {"name":"John", "age":30}

However, the following can happen: 1 - it is a happy case, the json is well formed, I can convert it to Person with objectMapper. 2 - it is a wrong json, JsonProcessingException, I would like to just throw a runtime exception 3 - the json is incomplete, like {"name":"John", "ag for this case alone, I would like to ignore this case only. However, I am not able to just ignore the message.

What did I try:

Flux<Person> consume() {
        return kafkaReceiver.receive()
                .map(oneRecordWithJson -> {
                    try {
                        //regular flow, no issue, just transform an input to an output
                        return objectMapper.readValue(oneRecordWithJson.value(), Person.class);
                    } catch (JsonEOFException | JsonMappingException e) {
                        // the json is broken in the first place, I do not want to do anything in this case.
                        // I do not want to return a default bad object, I do not want to throw any exception
                        // But here, I am required to return
                        LOGGER.error("the input is broken, it is not even a valid json to begin with {}", oneRecordWithJson.value(), e);
                        return null; //<- no!
                    } catch (JsonProcessingException e) {
                        // this is bad, I do want to throw an exception here (which will stop the consumer, but that what is expected)
                        throw new RuntimeException(e);
                    }
                })
                .map(oneCorrectPerson -> doSomething(oneCorrectPerson));
    }

Question:

How to just ignore the case when the input is not well formed? Without having to throw an exception, returning a null, a default object, etc...


Solution

  • Just use Flux.mapNotNull:

        return kafkaReceiver.receive()
                .mapNotNull(oneRecordWithJson -> {
                    // whatever
                    return null; // record would be skipped
                })