Search code examples
javaapache-kafkakafka-consumer-apispring-kafka

Read 1 message concurrently from multiple Kafka topics


I set the concurrency as 1 for my Kafka Listener.

    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> 
    factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(conncurrency);
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());

I am listening to 3 different topics

    @KafkaListener(topics = "#{'${kafka.consumer.topic.name}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(@Payload Map<String, Object> conciseMap,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) int offset,
            Acknowledgment ack) {           
        processMessage(conciseMap,partition,offset,ack,false);
    }

In this case , will the listener read one message from the first topic & once it is processed read 1 message from next topic and so on? Or will it concurrently process 1 message from each topic.

If it is former , is there a way to read 1 message concurrently from all the topics without creating multiple listeners?


Solution

  • There is no guarantee how the Kafka broker will allocate the partitions across the container threads; if you only have one partition; they will probably all be allocated to the same container thread. That's what just happened when I ran a test with container concurrency=3...

    2017-10-31 16:40:26.066 INFO 35202 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]

    2017-10-31 16:40:26.066 INFO 35202 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]

    2017-10-31 16:40:26.079 INFO 35202 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[bar-0, foo-0, baz-0]

    With 10 partitions per topic, I got this distribution...

    2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[foo10-5, foo10-6, foo10-4, baz10-5, baz10-4, baz10-6, bar10-5, bar10-4, bar10-6]

    2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[bar10-1, bar10-0, bar10-3, bar10-2, baz10-1, baz10-0, baz10-3, baz10-2, foo10-3, foo10-1, foo10-2, foo10-0]

    2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[baz10-9, baz10-8, baz10-7, bar10-9, bar10-8, foo10-9, bar10-7, foo10-7, foo10-8]

    As you can see, some partitions from each topic were allocated to each thread. But two of the threads got 9 partitions total while one got 12.

    If you want complete control, I would suggest a listener per topic.