I have kafka handler in spring boot:
@KafkaListener(topics = "topic-one", groupId = "response")
public void listen(String response) {
myService.processResponse(response);
}
For example producer send one message every second. But myService.processResponse
work 10 seconds. I need handle each message and start myService.processResponse
in new thread. I can create my executor and delegate each response to it. But I think there are another configs in kafka for them. I found 2:
1) add concurrency = "5"
to @KafkaListener
annotation - It seems to be working. But I'm not sure how correct, because I have second way:
2) I can create ConcurrentKafkaListenerContainerFactory
and set to it ConsumerFactory
and concurrency
I do not understand the difference between these methods? is it enough just to add concurrency = "5"
to @KafkaListener
annotation or I need create ConcurrentKafkaListenerContainerFactory
?
Or I do not understand anything at all and is there another way?
Using an executor makes things complicated, with regard to managing committed offsets; it is not recommended.
With @KafkaListener
, the framework creates a ConcurrentKafkaListenerContainerFactory
for you.
concurrency
on the annotation is just a convenience; it overrides the factory setting.
This allows you to use the same factory with multiple listeners, each with different concurrency.
You can set the container concurrency (default) using a boot property; that value is overridden by the annotation value; see the javadocs...
/**
* Override the container factory's {@code concurrency} setting for this listener. May
* be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
* which case {@link Number#intValue()} is used to obtain the value.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the concurrency.
* @since 2.2
*/
String concurrency() default "";