Search code examples
spring-bootspring-kafka

How to set AbstractConsumerSeekAware on ConcurrentKafkaListenerContainerFactory?


I am trying to set a generic MessageListener on ConcurrentKafkaListenerContainerFactory like below

@Component
@Slf4j
public class GossiperMessageListener extends AbstractConsumerSeekAware implements
    MessageListener<String, String> {


  @Override
  public void onPartitionsAssigned(
      @NotNull Map<TopicPartition, Long> assignments, @NotNull ConsumerSeekCallback callback) {
    val groupId = KafkaUtils.getConsumerGroupId();
    log.info("XXXXX groupId: {} assigned partitions: {}", groupId, assignments);
  }

  @Override
  public void onMessage(ConsumerRecord<String, String> data) {
    log.info("Received message with Key: {}", data.key());
  }
}
@Configuration
public class KafkaConsumerConfig {

  public static final String AT_LEAST_ONCE_KAFKA_CONTAINER_FACTORY =
      "atLeastOnceKafkaListenerContainerFactory";

  public Map<String, Object> consumerConfigBase() {
    val props = new HashMap<String, Object>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
    props.put(
        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
    props.put(
        ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
        300000);
    return props;
  }

  @Bean
  public AbstractKafkaListenerContainerFactory<
      ConcurrentMessageListenerContainer<String, String>, String, String>
  atLeastOnceKafkaListenerContainerFactory(GossiperMessageListener listener) {
    val factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigBase()));
    factory.getContainerProperties().setMessageListener(listener);
    return factory;
  }
}

I can see that partitions are getting assigned and I can receive messages from Kafka but my custom GossiperMessageListener is not getting invoked. Can someone let me know what is going wrong

2024-06-17T21:54:54.439+05:30  INFO 38597 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : TOPIC_A.group_id: partitions assigned: [TOPIC_A-0]
...

The reproducible code is placed in kafka_exp branch in github. To send a message to Kafka running at the port 19092 in local do

curl --request GET 'http://localhost:8080/hello'

In my actual project, there are many service classes with @KafkaListener listening on different topics. So I am looking for a way to set the ConsumerSeekAware at the ConcurrentKafkaListenerContainerFactory so that I do not have to modify all service classes with @KafkaListener

  • spring-kafka - 3.0.12

Solution

  • How do you use that atLeastOnceKafkaListenerContainerFactory? Apparently you also have somewhere @KafkaListener since you got a MessageListenerContainer with those assignment activities. See KafkaListenerContainerFactory to be sure that it is exactly what you need. You definitely can implement AbstractConsumerSeekAware on the service with @KafkaListener.

    See docs for more info: https://docs.spring.io/spring-kafka/reference/kafka/seek.html