Search code examples
spring-bootapache-kafkaspring-kafkaproject-reactor

Blocking call in @KafkaListener spring boot


When I am using spring @KafkaListener is there any difference if I subscribe to reactive flow:

@KafkaListener(topics = "topic-name", groupId = "group-id", containerFactory="kafkaListenerContainerFactory")
public void consume(org.springframework.messaging.Message<Message<?>> message){
    reactiveService.doSmth().subscribe();
}

or just block it:

@KafkaListener(topics = "topic-name", groupId = "group-id", containerFactory="kafkaListenerContainerFactory")
public void consume(org.springframework.messaging.Message<Message<?>> message){
    reactiveService.doSmth().block();
}

ReactiveService.class

public Mono<String> doSmth() {
  ...
}

Per my understanding @KafkaListener has no interaction with reactor netty threads and uses its own task executor to create threads, meaning I can use any of this approach without getting any performance loss?

PS: I know that there is reactive KafkaListener support, but currently I am bound to regular @KafkaListener


Solution

  • The subscribe() may cause an execution of your reactive stream on a different thread. The block() will indeed block this consume() method until your reactive stream is fulfilled. This definitely may impact the Kafka Consumer thread where we have to return from our execution withing max.poll.interval.ms to keep our consumer alive. Otherwise it is treated as idle and group rebalance may occur. See more info in docs: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#max-poll-interval-ms