Search code examples
spring-kafkaspring-cloud-stream

autoStartup for @StreamListener


Unlike @KafkaListener, it looks like @StreamListener does not support the autoStartup parameter. Is there a way to achieve this same behavior for @StreamListener? Here's my use case:

I have a generic Spring application that can listen to any Kafka topic and write to its corresponding table in my database. For some topics, the volume is low and thus processing a single message with very low latency is fine. For other topics that are high volume, the code should receive a microbatch of messages and write to the database using Jdbc batch on a less frequent basis. Ideally the definition for the listeners would look something like this:

// low volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.singleMessageListenerEnabled}")
public void handleSingleMessage(@Payload GenericRecord message) ...

// high volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.multipleMessageListenerEnabled}")
public void handleMultipleMessages(@Payload List<GenericRecord> messageList) ...

For a low-volume topic, I would set application.singleMessageListenerEnabled to true and application.multipleMessageListenerEnabled to false, and vice versa for a high-volume topic. Thus, only one of the listeners would be actively listening for messages and the other not actively listening.

Is there a way to achieve this with @StreamListener?


Solution

  • First, please consider upgrading to functional programming model which would take you minutes to refactor. We've all but deprecated the annotation-based programming model. If you do then what you're trying to accomplish is very easy:

    @SpringBootApplication
    public class SimpleStreamApplication {
    
        public static void main(String[] args) throws Exception {
            SpringApplication.run(SimpleStreamApplication.class);
        }
    
        @Bean
        public Consumer<GenericRecord> singleRecordConsumer() {...}
    
        @Bean
        public Consumer<List<GenericRecord>> multipleRecordConsumer() {...}
    }
    

    Then you can simply use --spring.cloud.function.definition=singleRecordConsumer property for a single case and --spring.cloud.function.definition=multipleRecordConsumer when starting the application, this ensuring which specific listener you want to activate.