Search code examples
spring-bootspring-kafkaspring-boot-testtestcontainersspring-boot-testcontainers

@KafkaListener with containerFactory is not triggered in @SpringBootTest


I am trying to write an integration test for my Kafka consumer using @SpringBootTest and Testcontainers for the underlying infrastructure. My setup looks like this one:

@Component
@ConditionalOnProperty("outdoor.kafka.enableKafkaReading")
public class Consumer {

    @Value("${topic.name}
    public final String topic;
    @Value("${consumer.group}"
    public final String consumerGroup;

    @KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.consumerGroup}",
            containerFactory = "containerFactory")
    public void consume(String message) {
        LOGGER.info("Received message: {}", message);
    }
}
@Configuration
public class KafkaConsumerConfig {
    // I actually use custom deserializers for my own entities, using String here for example's sake
    @Bean
    public ConsumerFactory<String, String> consumerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(null), new StringDeserializer(), new StringDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(CommonErrorHandler errorHandler,
                                                                                                                    ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(leaderboardStandingEventConsumerFactory);
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

In my test, I want to write something to the topic with a KafkaTemplate and then make sure the listener has been called. Like this:

@SpringBootTest
@ContextConfiguration(initializers = TestContainersInitializer.class)
class ConsumerTest {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @SpyBean
    private Consumer consumer;

    @Test
    void consume() {
        Instant now = Instant.now();
        kafkaTemplate.send("topic-name", "1", "message");
        await()
            .pollInterval(Duration.ofSeconds(1))
            .atMost(5, SECONDS)
            .untilAsserted(() -> {
                verify(consumer).consume(any());
            });
   }
}

My message is written correctly to the topic. However, the listener does not start up to consume the message. I am sure about this because if I use a standard KafkaConsumer to check the contents, they are being written just right. Do you have any idea what is going wrong with this?

As this is a more complex setup originally, I tried to run also the simpler variant described above, and it still doesn't work.

Some references I consulted:

  1. https://testcontainers.com/guides/testing-spring-boot-kafka-listener-using-testcontainers/
  2. https://www.atomicjar.com/2023/06/testing-kafka-applications-with-testcontainers/

Also raised an issue on the Spring Boot project: https://github.com/spring-projects/spring-boot/issues/41009


Solution

  • The problem was I was missing the @TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset=earliest") configuration on the test class, and the consumer was missing the messages I was writing.