Search code examples
spring-kafka

Multiple consumers using spring kafka with reply topic being written outside spring


I am using Spring-Kafka and trying to achieve request reply pattern. My use case is, client calls Rest endpoint with payload, I am sending this message to a kafka topic(request topic). I have spark job which consumes this message, processes it and sends response in another kafka topic(reply-topic). Once the message is written back to reply topic, my web application should consume this message and return as http response to the client.

What I have achieved so far.

I am using Spring-Kafka for solving this use case. I am able to send a request body as kafka message to request topic. Spring-Kafka is generating a kafka_correlationId as kafka header before sending the kafka message. I have registered the producerInterceptor, got hold of correlationId generated and passing this in the message body.

In spark job, I am able to consume this kafka message, process it and while sending back in reply-topic, I am adding message header kafka_correlationId with same value as what is generated.

When I have single consumer, use case is working perfectly fine.

What is not working.

Now, I have deployed 2 instances of my web application, reply-topic has 2 partitions with same consumer group Id.

App-instance-1 : consuming from partition-0
App-instance-2 : consuming from partition-1

If my request goes to App-instance-1 and If my spark job is able to write to partion-0 of reply-topic, I am able to get the response. However, If spark job writes to partion-1 in reply-topic, since App-instance-1 is listening for only partion-0, I am not able to get the response and app fails with timeout exception. Similar case for another app instance.

Please let me know what I am supposed to configure to achieve this.


Solution

  • There are two solutions -

    • also set the REPLY_PARTITION header (and have the spark app send the reply to that partition) - and statically assign the partitions.
    • use a different group.id in each instance so that both clients get the reply - in this case, set sharedReplyTopic to avoid log noise when an instance gets a reply for a request he didn't send.

    See the documentation.

    When you configure with a single reply TopicPartitionOffset, you can use the same reply topic for multiple templates, as long as each instance listens on a different partition. When configuring with a single reply topic, each instance must use a different group.id. In this case, all instances receive each reply, but only the instance that sent the request finds the correlation ID. This may be useful for auto-scaling, but with the overhead of additional network traffic and the small cost of discarding each unwanted reply. When you use this setting, we recommend that you set the template’s sharedReplyTopic to true, which reduces the logging level of unexpected replies to DEBUG instead of the default ERROR.