Search code examples
apache-kafkaapache-kafka-streams

Skipping messages throwing runtime errors on kafka stream topology


I have a Kafka Stream topology create on the usual way.

public Topology buildTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    builder.stream(inputTopic, Consumed.with(...))
        .filter(this::filterEvents)
        .mapValues(this::mapToOtherEvent)
        .to(outputTopic, Produced.with(...));

    return builder.build();
  }

The method mapToOtherEvent throws a business logic exception rarely and this is stopping the whole processing. I would like to know if there is a mechanism to skip this failing business logic errors and continue processing with the following message on the queue. So far I explored with:

  • implementing StreamsUncaughtExceptionHandler and what it seems to do is just process the exception and replay the message again, falling on a loop.
  • Reading about ProductionExceptionHandler and DeserializationExceptionHandler but they don't seem to deal with Bussiness logic exception.
  • This answer which only suggest code solutions which in my case will have other effects if I implement them

Solution

  • You can switch from mapValues to flatMapValues and change mapToOtherEvent to catch the exception and return zero result values (ie, an empty Collection) for this case, to effectively swallow the error.

    The next release (Apache Kafka 3.9) adds a new handler, called ProcessingExceptionHandler that allows you to skip such errors natively, similar to the already existing handler.