Search code examples
batch-processingspring-kafkaspring-cloud-stream

Spring Cloud Stream deserialization error handling for Batch processing


I have a question about handling deserialization exceptions in Spring Cloud Stream while processing batches (i.e. batch-mode: true).

Per the documentation here, https://docs.spring.io/spring-kafka/docs/2.5.12.RELEASE/reference/html/#error-handling-deserializer, (looking at the implementation of FailedFooProvider), it looks like this function should return a subclass of the original message.

Is the intent here that a list of both Foo's and BadFoo's will end up at the original @StreamListener method, and then it will be up to the code (i.e. me) to sort them out and handle separately? I suspect this is the case, as I've read that the automated DLQ sending isn't desirable for batch error handling, as it would resubmit the whole batch.

And if this is the case, what if there is more than one message type received by the app via different @StreamListener's, say Foo's and Bar's. What type should the value function return in that case? Below is the pseudo code to illustrate the second question?

@StreamListener
public void readFoos(List<Foo> foos) {
    List<> badFoos = foos.stream()
        .filter(f -> f instanceof BadFoo)
        .map(f -> (BadFoo) f)
        .collect(Collectors.toList());
   // logic
}
@StreamListener
public void readBars(List<Bar> bars) {
  // logic
}


// Updated to return Object and let apply() determine subclass
public class FailedFooProvider implements Function<FailedDeserializationInfo, Object> {

  @Override
  public Object apply(FailedDeserializationInfo info) {
    if (info.getTopics().equals("foo-topic") {
        return new BadFoo(info);
    }
    else if (info.getTopics().equals("bar-topic") {
        return new BadBar(info);
    }
  }

}

Solution

  • Yes, the list will contain the function result for failed deserializations; the application needs to handle them.

    The function needs to return the same type that would have been returned by a successful deserialization.

    You can't use conditions with batch listeners. If the list has a mixture of Foos and Bars, they all go to the same listener.