Search code examples
spring-kafka

Replies are not always delivered to the desired reply listener container


We are applying the request/reply semantics using spring-kafka with the ReplyingKafkaTemplate. However, we noticed that sometimes the reply does not end up where it should.

Here is a rough description of our setup:

Service A

2 instances consuming messages from topic-a which has 2 partitions. (each instance gets 1 partitions assigned). Service A is the initiator.

Service B:

2 instances, consumes messages from topic-b, which also has 2 partitions. Reacts to incoming messages from A and returns a reply message using the @SendTo annotation.

Observed behaviour:

When an instance of service A, e.g. A1, is sending a message to service B the send fails with a reply timeout. The request is consumed by B successfully and a reply is being returned, however it was consumed by the other instance, e.g. A2. From the logging I can see that A1 get topic-a-0 assigned, whereas A2 gets topic-a-1 assigned.

Suggestions from the docs:

Our scenario is described in this section of the docs: https://docs.spring.io/spring-kafka/reference/html/#replying-template It gives a couple suggestions:

  1. Give each instance a dedicated reply topic
  2. Use reply partition header and use dedicated partitions for each instance

Our setup is based on a single topic for the whole service. So all incoming events and reply events are send to this and consumed from this topic. So option #1 is not desirable in our situation.

The downside of option #2 is that you cannot use the group management feature, which is a pitty because our services run on Kubernetes so we'd like to use the group management feature for maximum flexibility.

A third option?

So I was wondering if there was a third option: Why not use group management and determine the assigned topic partitions of the reply container at runtime on the fly when sending a message and set the reply partition header. It looks like the ReplyingKafkaTemplate#getAssignedReplyTopicPartitions method provides exactly this information. This way, the partitions are not fixed and we can still use the group management feature. The only downside I can foresee is that when the partitions are rebalanced after the request was sent but before the reply was received, the request could fail.

I already have tested something to see if it works and it looks like it does. The main reason for me to post this question is to check if my idea makes sense, are there any caveats to take into account. I'm wondering why this is not supported by spring-kafka out of the box.

If my solution makes sense, I am willing to raise an enhancement issue and provide a PR on the spring-kafka project.


Solution

  • The issue, as you describe, is there is no guarantee we'll get the same partition(s) after a rebalance.

    The "third option" is to use a different group.id for each instance and set sharedReplyTopic=true. In this case all instances will get the reply and it will be discarded by the instance(s) that did not send the request.

    The best solution, however, is to use a unique reply topic for each instance.