Search code examples
javaspring-bootapache-kafkakafka-consumer-apispring-kafka

How can I process @KafkaListener method in different threads?


I have kafka handler in spring boot:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }

For example producer send one message every second. But myService.processResponse work 10 seconds. I need handle each message and start myService.processResponse in new thread. I can create my executor and delegate each response to it. But I think there are another configs in kafka for them. I found 2:

1) add concurrency = "5" to @KafkaListener annotation - It seems to be working. But I'm not sure how correct, because I have second way:

2) I can create ConcurrentKafkaListenerContainerFactory and set to it ConsumerFactory and concurrency

I do not understand the difference between these methods? is it enough just to add concurrency = "5" to @KafkaListener annotation or I need create ConcurrentKafkaListenerContainerFactory?

Or I do not understand anything at all and is there another way?


Solution

  • Using an executor makes things complicated, with regard to managing committed offsets; it is not recommended.

    With @KafkaListener, the framework creates a ConcurrentKafkaListenerContainerFactory for you.

    concurrency on the annotation is just a convenience; it overrides the factory setting.

    This allows you to use the same factory with multiple listeners, each with different concurrency.

    You can set the container concurrency (default) using a boot property; that value is overridden by the annotation value; see the javadocs...

    /**
     * Override the container factory's {@code concurrency} setting for this listener. May
     * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
     * which case {@link Number#intValue()} is used to obtain the value.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return the concurrency.
     * @since 2.2
     */
    String concurrency() default "";