In relation with : this question
I'm trying to read a compacted topic via a @KafkaListener. I'd like every consumers to read the whole topic each time.
I can not generate an unique groupId each for each consumers. So I wanted to use a null groupid.
I've tried to configure the container and the consumer to set the groupId to null, but neither worked.
Here is my Container configuration :
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
// Set ackMode to manual and never commit, we are reading from the beginning each time
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setAckOnError(false);
// Remove groupId, we are consuming all partitions here
factory.getContainerProperties().setGroupId(null);
// Enable idle event in order to detect when init phase is over
factory.getContainerProperties().setIdleEventInterval(1000L);
Also tried to force the consumer configuration :
Map<String, Object> consumerProperties = sprinfKafkaProperties.buildConsumerProperties();
// Override group id property to force "null"
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, null);
ConsumerFactory<Object, Object> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
When I'm setting the container groupId to null, a default with the listener id is used.
When I'm forcing the consumer to a null groupId property, I've got an error : No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
You can't use a null group.id
.
From the kafka documentation.
group.id
A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.
If you want to read from the beginning each time, you can either add a ConsumerAwareRebalanceListener
to the container factory or make your listener implement ConsumerSeekAware
.
In either case, when onPartitionsAssigned
is called, seek each topic/partition to the beginning.
I can not generate an unique groupId each for each consumers.
You can use a SpEL expression to generate a UUID.
EDIT
You can manually assign topics/partitions, and the group.id can be null then.
@SpringBootApplication
public class So56114299Application {
public static void main(String[] args) {
SpringApplication.run(So56114299Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so56114299", 10, (short) 0);
}
@KafkaListener(topicPartitions = @TopicPartition(topic = "so56114299",
partitions = "#{@finder.partitions('so56114299')}"))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
System.out.println(key + ":" + payload);
}
@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
return new PartitionFinder(consumerFactory);
}
public static class PartitionFinder {
public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
private final ConsumerFactory<String, String> consumerFactory;
public String[] partitions(String topic) {
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(pi -> "" + pi.partition())
.toArray(String[]::new);
}
}
}
}
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual