I am trying to set a generic MessageListener
on ConcurrentKafkaListenerContainerFactory
like below
@Component
@Slf4j
public class GossiperMessageListener extends AbstractConsumerSeekAware implements
MessageListener<String, String> {
@Override
public void onPartitionsAssigned(
@NotNull Map<TopicPartition, Long> assignments, @NotNull ConsumerSeekCallback callback) {
val groupId = KafkaUtils.getConsumerGroupId();
log.info("XXXXX groupId: {} assigned partitions: {}", groupId, assignments);
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
log.info("Received message with Key: {}", data.key());
}
}
@Configuration
public class KafkaConsumerConfig {
public static final String AT_LEAST_ONCE_KAFKA_CONTAINER_FACTORY =
"atLeastOnceKafkaListenerContainerFactory";
public Map<String, Object> consumerConfigBase() {
val props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
300000);
return props;
}
@Bean
public AbstractKafkaListenerContainerFactory<
ConcurrentMessageListenerContainer<String, String>, String, String>
atLeastOnceKafkaListenerContainerFactory(GossiperMessageListener listener) {
val factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigBase()));
factory.getContainerProperties().setMessageListener(listener);
return factory;
}
}
I can see that partitions are getting assigned and I can receive messages from Kafka but my custom GossiperMessageListener
is not getting invoked. Can someone let me know what is going wrong
2024-06-17T21:54:54.439+05:30 INFO 38597 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : TOPIC_A.group_id: partitions assigned: [TOPIC_A-0]
...
The reproducible code is placed in kafka_exp branch in github. To send a message to Kafka running at the port 19092 in local do
curl --request GET 'http://localhost:8080/hello'
In my actual project, there are many service classes with @KafkaListener
listening on different topics. So I am looking for a way to set the ConsumerSeekAware
at the ConcurrentKafkaListenerContainerFactory
so that I do not have to modify all service classes with @KafkaListener
How do you use that atLeastOnceKafkaListenerContainerFactory
? Apparently you also have somewhere @KafkaListener
since you got a MessageListenerContainer
with those assignment activities. See KafkaListenerContainerFactory
to be sure that it is exactly what you need. You definitely can implement AbstractConsumerSeekAware
on the service with @KafkaListener
.
See docs for more info: https://docs.spring.io/spring-kafka/reference/kafka/seek.html