Search code examples
apache-kafkakafka-consumer-apispring-kafka

Spring @KafkaListener with topicPattern: handle runtime topic creation


I'm using Spring @KafkaListener with a topicPattern. If during the runtime of this application I create a new topic matching the pattern and start publishing to that, the listener application simply ignores those messages. In other words, it only pulls all the topics matching the pattern at startup and listens to those.

What's the easiest way to "refresh" that? Thanks!


Solution

  • By default, new topics will be picked up within 5 minutes (default) according to the setting of https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms

    The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

    You can reduce it to speed things up at the expense of increased traffic.

    EDIT

    This shows it working as expected...

    @SpringBootApplication
    public class So71386069Application {
    
        private static final Logger log = LoggerFactory.getLogger(So71386069Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So71386069Application.class, args);
        }
    
        @KafkaListener(id = "so71386069", topicPattern = "so71386069.*",
                properties = "metadata.max.age.ms:60000")
        void listen(String in) {
            System.out.println(in);
        }
    
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so71386069").partitions(1).replicas(1).build();
        }
    
        @Bean
        ApplicationRunner runner(KafkaAdmin admin) {
            return args -> {
                try (AdminClient client = AdminClient.create(admin.getConfigurationProperties())) {
                    IntStream.range(0, 10).forEach(i -> {
                        try {
                            Thread.sleep(30_000);
                            String topic = "so71386069-" + i;
                            log.info("Creating {}", topic);
                            client.createTopics(Collections.singleton(
                                    TopicBuilder.name(topic).partitions(1).replicas(1).build())).all().get();
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                    });
                }
            };
        }
    
    }
    
    2022-03-07 15:41:07.131  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
       : so71386069: partitions assigned: [so71386069-0]
    2022-03-07 15:41:34.007  INFO 33630 --- [           main] com.example.demo.So71386069Application
       : Creating so71386069-0
    2022-03-07 15:42:04.193  INFO 33630 --- [           main] com.example.demo.So71386069Application
       : Creating so71386069-1
    ...
    2022-03-07 15:42:07.590  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
        : so71386069: partitions revoked: [so71386069-0]
    ...
    2022-03-07 15:42:07.599  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
       : so71386069: partitions assigned: [so71386069-0, so71386069-1-0, so71386069-0-0]
    2022-03-07 15:42:34.378  INFO 33630 --- [           main] com.example.demo.So71386069Application
       : Creating so71386069-2
    2022-03-07 15:43:04.554  INFO 33630 --- [           main] com.example.demo.So71386069Application
       : Creating so71386069-3
    ...
    2022-03-07 15:43:08.403  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
      : so71386069: partitions revoked: [so71386069-0, so71386069-1-0, so71386069-0-0]
    ...
    2022-03-07 15:43:08.411  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
       : so71386069: partitions assigned: [so71386069-0, so71386069-3-0, so71386069-2-0, so71386069-1-0, so71386069-0-0]
    ...