Search code examples
javaspring-bootapache-kafkaspring-kafkaembedded-kafka

Spring EmbeddedKafka producer not waiting for consumer acknowledgement


I want the producer in my test to wait until the consumer in the class-under-test has acknowledged by calling Acknowledgement.acknowledge(). My consumer is initialized with the following properties:

spring:
  kafka:
    consumer:
      group-id: test-group
      bootstrap-servers: ${spring.embedded.kafka.brokers}
      enable-auto-commit: false
      auto-offset-reset: earliest
      max-poll-records: 1

My kafkaListenerContainerFactory is initialized as follows:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, MessageType> kafkaListenerContainerFactory(
            ConsumerFactory<String, MessageType> consumerFactory)
    {
        ConcurrentKafkaListenerContainerFactory<String, MessageType> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setErrorHandler(new LoggingErrorHandler());
        factory.setConcurrency(numListeners);
        return factory;
    }

I have also set acks in my producer config to all. The producer doesn't seem to block though even though I'm waiting on the future returned.

producer.send(new ProducerRecord<Integer, String>(TOPIC, key, value)).get();

How can I get the producer in my test to block until the consumer acknowledges without adding some sort of sleep?


Solution

  • You cannot; producers and consumers are independent; acks are simply to acknowledge that the broker has received and secured the record; it has nothing to do with the consumer side.

    You need your own logic for the consumer to tell the producer it has received the record.