Search code examples
javaspringspring-bootkafka-consumer-apispring-kafka

Spring Kafka with Dynamic @KafkaListener


I'm using Spring Boot 2.x with spring-kafka (not spring-integration-kafka)

I have multiple beans annotated with @KafkaListener ... each one consuming from one topic... so since I have 12 topics then I also need to have 12 KafkaConsumers beans ... and I would like to know if I can create those beans programmatically / dynamically ... maybe using KafkaListenerEndpointRegistry in order to create consumer containers dynamically.

Note: I need to consume messages in batch ... so maybe I can use BatchMessageListener?

Current code:

@KafkaListener(
        id = COUNTRY,
        containerFactory = KAFKA_LISTENER_FACTORY_BEAN_NAME,
        topics = {TOPIC},
        groupId = GROUP_ID,
        clientIdPrefix = CLIENT_ID,
        errorHandler = VALIDATION_ERROR_HANDLER_BEAN_NAME
    )
    @Override
    public void consume(final List<MessageDTO> messages,
        @Header(KafkaHeaders.RECEIVED_TOPIC) final List<String> topics,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) final List<String> messagesKey,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitionIds,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final List<Long> timestamps,
        @Header(KafkaHeaders.OFFSET) final List<Long> offsets) {
            (...)
    }

Each topic consumer has its own implementation depending on the topic. Can you guys point me to a blog/pseudocode/git thread/answer, please?


Solution

  • https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/support/GenericApplicationContext.html#registerBean-java.lang.Class-java.util.function.Supplier-org.springframework.beans.factory.config.BeanDefinitionCustomizer...-

    Create your object and register it as a bean providing it via the Supplier in the above method. Spring will run the bean post processors necessary to set everything up.