Search code examples
apache-kafkaspring-kafka

Wait for Kafka consumer in request/reply semantics to initialise offsets


I am using a ReplyingKafkaTemplate with a shared reply topic (i.e. group.id is null and group management is disabled). The initial offset is set to 'latest'.

I am facing the following issue:

  1. I manually start the listener container which causes an async operation that gets the metadata for the consumer.
  2. I call the sendAndReceive() method.
  3. The reply message is created before the async operation in the first step returns.
  4. The async operation returns and sets the offset to 1 (due to 'latest'), thus the reply message will be ignored.

In order to fix the issue, I need to assert that the offsets are initialized before calling the sendAndReceive() method. I have looked for a suitable Application Event, but none of them seem to fit.

Is there any way on how to assert that a consumer has completely initialized?

The problem is similar to Wait for kafka consumer to poll before writing messages to kafka spring boot but the suggested solution doesn't solve it either (unless I misunderstand the proposal, it would just lower the risk but not eliminate it).


Solution

  • It's not currently possible.

    I opened an issue for this https://github.com/spring-projects/spring-kafka/issues/2318

    EDIT

    As a work around you could set idleEventInterval and wait for the first ListenerContainerIdleEvent.