Search code examples
apache-kafkaspring-kafka

How to set groupId to null in @KafkaListeners


In relation with : this question

I'm trying to read a compacted topic via a @KafkaListener. I'd like every consumers to read the whole topic each time.

I can not generate an unique groupId each for each consumers. So I wanted to use a null groupid.

I've tried to configure the container and the consumer to set the groupId to null, but neither worked.

Here is my Container configuration :

 ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        // Set ackMode to manual and never commit, we are reading from the beginning each time
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setAckOnError(false);
        // Remove groupId, we are consuming all partitions here
        factory.getContainerProperties().setGroupId(null);
        // Enable idle event in order to detect when init phase is over
        factory.getContainerProperties().setIdleEventInterval(1000L);

Also tried to force the consumer configuration :

Map<String, Object> consumerProperties = sprinfKafkaProperties.buildConsumerProperties();
        // Override group id property to force "null"
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, null);
        ConsumerFactory<Object, Object> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

When I'm setting the container groupId to null, a default with the listener id is used.

When I'm forcing the consumer to a null groupId property, I've got an error : No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.


Solution

  • You can't use a null group.id.

    From the kafka documentation.

    group.id

    A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.

    If you want to read from the beginning each time, you can either add a ConsumerAwareRebalanceListener to the container factory or make your listener implement ConsumerSeekAware.

    In either case, when onPartitionsAssigned is called, seek each topic/partition to the beginning.

    I can not generate an unique groupId each for each consumers.

    You can use a SpEL expression to generate a UUID.

    EDIT

    You can manually assign topics/partitions, and the group.id can be null then.

    @SpringBootApplication
    public class So56114299Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So56114299Application.class, args);
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("so56114299", 10, (short) 0);
        }
    
        @KafkaListener(topicPartitions = @TopicPartition(topic = "so56114299",
                              partitions = "#{@finder.partitions('so56114299')}"))
        public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
            System.out.println(key + ":" + payload);
        }
    
        @Bean
        public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
            return new PartitionFinder(consumerFactory);
        }
    
        public static class PartitionFinder {
    
            public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
                this.consumerFactory = consumerFactory;
            }
    
            private final ConsumerFactory<String, String> consumerFactory;
    
            public String[] partitions(String topic) {
                try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
                    return consumer.partitionsFor(topic).stream()
                        .map(pi -> "" + pi.partition())
                        .toArray(String[]::new);
                }
            }
    
        }
    
    }
    
    
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.listener.ack-mode=manual