Unlike @KafkaListener
, it looks like @StreamListener
does not support the autoStartup
parameter. Is there a way to achieve this same behavior for @StreamListener
? Here's my use case:
I have a generic Spring application that can listen to any Kafka topic and write to its corresponding table in my database. For some topics, the volume is low and thus processing a single message with very low latency is fine. For other topics that are high volume, the code should receive a microbatch of messages and write to the database using Jdbc batch on a less frequent basis. Ideally the definition for the listeners would look something like this:
// low volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.singleMessageListenerEnabled}")
public void handleSingleMessage(@Payload GenericRecord message) ...
// high volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.multipleMessageListenerEnabled}")
public void handleMultipleMessages(@Payload List<GenericRecord> messageList) ...
For a low-volume topic, I would set application.singleMessageListenerEnabled
to true and application.multipleMessageListenerEnabled
to false, and vice versa for a high-volume topic. Thus, only one of the listeners would be actively listening for messages and the other not actively listening.
Is there a way to achieve this with @StreamListener
?
First, please consider upgrading to functional programming model which would take you minutes to refactor. We've all but deprecated the annotation-based programming model. If you do then what you're trying to accomplish is very easy:
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class);
}
@Bean
public Consumer<GenericRecord> singleRecordConsumer() {...}
@Bean
public Consumer<List<GenericRecord>> multipleRecordConsumer() {...}
}
Then you can simply use --spring.cloud.function.definition=singleRecordConsumer
property for a single case and --spring.cloud.function.definition=multipleRecordConsumer
when starting the application, this ensuring which specific listener you want to activate.