I am using Spring-Kafka and trying to achieve request reply pattern. My use case is, client calls Rest endpoint with payload, I am sending this message to a kafka topic(request topic). I have spark job which consumes this message, processes it and sends response in another kafka topic(reply-topic). Once the message is written back to reply topic, my web application should consume this message and return as http response to the client.
What I have achieved so far.
I am using Spring-Kafka for solving this use case. I am able to send a request body as kafka message to request topic. Spring-Kafka is generating a kafka_correlationId as kafka header before sending the kafka message. I have registered the producerInterceptor, got hold of correlationId generated and passing this in the message body.
In spark job, I am able to consume this kafka message, process it and while sending back in reply-topic, I am adding message header kafka_correlationId with same value as what is generated.
When I have single consumer, use case is working perfectly fine.
What is not working.
Now, I have deployed 2 instances of my web application, reply-topic has 2 partitions with same consumer group Id.
App-instance-1 : consuming from partition-0
App-instance-2 : consuming from partition-1
If my request goes to App-instance-1 and If my spark job is able to write to partion-0 of reply-topic, I am able to get the response. However, If spark job writes to partion-1 in reply-topic, since App-instance-1 is listening for only partion-0, I am not able to get the response and app fails with timeout exception. Similar case for another app instance.
Please let me know what I am supposed to configure to achieve this.
There are two solutions -
group.id
in each instance so that both clients get the reply - in this case, set sharedReplyTopic
to avoid log noise when an instance gets a reply for a request he didn't send.See the documentation.
When you configure with a single reply
TopicPartitionOffset
, you can use the same reply topic for multiple templates, as long as each instance listens on a different partition. When configuring with a single reply topic, each instance must use a differentgroup.id
. In this case, all instances receive each reply, but only the instance that sent the request finds the correlation ID. This may be useful for auto-scaling, but with the overhead of additional network traffic and the small cost of discarding each unwanted reply. When you use this setting, we recommend that you set the template’ssharedReplyTopic
totrue
, which reduces the logging level of unexpected replies to DEBUG instead of the default ERROR.