Search code examples
springavrospring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream content-based routing on payload


We are using Spring Cloud Stream v2.2 with Kafka and Avro (native encoder/decoder). We are trying to use content-based routing based on a condition on the payload. I understand that according to Spring Cloud Stream docs content-based routing is only achievable on the header as payload has not gone through the type conversion process when it arrives at the condition. Therefore, unless the condition is based on the byte format, it won't work as expected. However, I understand that when Avro is used in the native mode then the message header is skipped and no type negotiations are being handled. So I am not sure if content-based routing does work on the payload as expected or not.

@StreamListener(target = Channels.INPUT, condition =
      "payload.context['type']=='one' or"
          + " payload.context['type']=='two'")
  public void doStuff(TypeOneAndTwoData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}


@StreamListener(target = Channels.INPUT, condition =
      "payload.context['type']=='three' or"
          + " payload.context['type']=='four'")
  public void doOtherStuff(TypeThreeAndFourData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}

Based on the logging that I have got in place I can see that occasionally doStuff is being triggered and sometimes doOtherStuff is being triggered. However, it seems that mostly none of them is being triggered and the message is skipped. Based on the input data I am sure that context.type can only have 4 values of "one", "two", "three" and "four", so based on the input it's not possible to expect having something else, but frequently I can see the following entry in the logs:

Cannot find a @StreamListener matching for message with id: null

I have a few questions:

  • Does condition on payload work on the message after it is deserialized to the corresponding POJO class in the native Avro format?
  • Why sometimes the condition works and sometimes it does not? Does id: null mean something?
  • How content-based routing work from the thread perspective? Do multiple threads run when we have two StreamListner with separate conditions or they work single-threaded? How at-least-once guarantee message delivery being managed in this scenario? Should the conditions be mutually exclusive?

Solution

    • Yes; it's (currently) supported.

    • id: null is unfortunate; with Kafka the message.headers['id'] is null by default - it has no meaning anyway so this log message is not much use.

    • The issue is that none of the conditions match the converted payload

    You can set a breakpoint in DispatchingStreamListenerMessageHandler.handleRequestMessage() to figure out what's wrong.

    EDIT

    I didn't answer your third question.

    The calls are single-threaded; no, a message can match multiple conditions; you only get that log when no conditions match. See the method I referenced above.

    If multiple conditions match any exception thrown by a listener will stop processing (and invoke retry/DLQ processing etc).