Search code examples
javaapache-kafkaapache-kafka-streams

kafka streams: publish/send messages even when few record transformation throw exceptions?


A typical kafka streams application flow is as below (not including all step like props/serdes etc) -

    final StreamsBuilder builder = new StreamsBuilder();

    final KStream<String, String> textLines = builder.stream(inputTopic);

    final KStream<String, String> textTransformation_1 = textLines.processValues(value ->  value+"firstTranstormation");  

    final KStream<String, String> textTransformation_2 = textTransformation_1.processValues(value ->  value+"secondTranstormation"); 

    //my concern is at this stage -
    final KStream<String, String> textTransformation_3 = textTransformation_2.processValues(this::processValueAndDoRelatedStuff); 
    
    ....
    ....
    textTransformation_x.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
      

    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  

Now if the processValueAndDoRelatedStuff(String input) method throws an error, I don't want the program to crash but want kafka to only NOT send that one transformation output to outputTopic (i.e ignore the transformation of that one record) and continue dealing with processing rest of the incoming messages normally.

Is the above possible??

In generally, as there is a way to skip sending transformation output to outputTopic based on a predicate. In the next stage, I can think of adding an filter, if in processValueAndDoRelatedStuff(String input) i can catch the exception and return some value based on which I can filter in the next stage.

final KStream<String, String> textTransformation_4 = textTransformation_3.filter((k,v) -> !v.equals("badrecord")); 

But I am more interested in the case where the exception is not handled but thrown from the mapper functions. Is it possible for kafka to ignore that one record causing an exception and still proceed with rest of processing.


Solution

  • The default behavior is to stop the topology on any uncaught exception.

    If you want to catch them, simply don't use a function handle. Use a try-catch around the function

    final KStream<String, String> textTransformation_3 = textTransformation_2.processValues(value -> {
      try {
        return processValueAndDoRelatedStuff(value); 
      } catch (Exception e) {
        // log, if you want
        return null;
      }
    ).filter((k, v) -> Objects.nonNull(v)); // remove events that caused exceptions
    

    Otherwise, you can set exception handlers, as well - https://developer.confluent.io/learn-kafka/kafka-streams/error-handling/