Search code examples
spring-bootapache-kafkaservice

KafkaListener with long tasks in Spring Boot Service


I have a Service that consumes kafka messages and triggers a long process. It can take more than 10 min to process one message. Currently no new message is consumed until "doSomething()" method finishes. How can I make this a concurrent processing and process messages in parallel?

@Service
public class MyService {

    @KafkaListener(topics = "request-topic", groupId = "group_id"),
        containerFactory = "requirementsKafkaListenerFactory")    
    private void consumeKafkaRequirementsDataJson(KafkaRequirementsData kafkaRequirementsData) {
        System.out.println("Consumed JSON Message from kafka Topic: " + kafkaRequirementsData);
        doSomething(kafkaRequirementsData);
    }

Solution

  • You can set the concurrency property for this, but the amount of threads per consumer is limited by the amount of partitions for that topic. If you already have a decent amount of partitions this could be the way to go, but I wouldn't increase the number of partitions just for the sake of parallelism of long tasks.

    You could try implementing multithreading on your app, but managing resources would be on your side of the field, on top of considering that once a message is consumed and a thread is kicked off you would be responsible for its management (including retries if something goes wrong).

    See https://howtoprogram.xyz/2016/05/29/create-multi-threaded-apache-kafka-consumer/ and https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/ for some more info

    TL;DR, I'd make my decision based on:

    1. If enough partitions to run in parallel without clogging your tasks, set the concurrency property.
    2. If not enough partitions, enqueue your tasks and run them using a thread pool.