Search code examples
javaspring-bootapache-kafkaspring-kafkaspring-boot-test

Kafka eventlistener does not working in SpringBootTest


I want to test the reading of events from a kafka topic in a Spring boot te

@Slf4j
@SpringBootTest
@ActiveProfiles("kafka-test")
class EmbeddedKafkaIntegrationTest {

private static final String TOPIC = "my-topic";


private final BlockingQueue<ConsumerRecord<String, MyEvent>> consumptionQueue = new LinkedBlockingDeque<>();


@EventListener(condition = "event.listenerId.startsWith('test-listener')")
public void idleEventHandler(ListenerContainerIdleEvent event) {
    log.info(event.toString());
}

@KafkaListener(id = "test-listener", idIsGroup = false, topics = "${beb.topic.prefix}" + TOPIC,
        autoStartup = "true", containerFactory = "kafkaListenerContainerFactory")
private void listen(ConsumerRecord<String, MyEvent> consumerRecord) throws InterruptedException {
    log.info("Consume key={}, value={}", consumerRecord.key(), consumerRecord.value());
    consumptionQueue.put(consumerRecord);
}
}

For some reason the @EventListener is not working. The @KafkaListener works fine. If I put the @EventListener method in one of the application component classes, it also works fine. It just refuses to work within the EmbeddedKafkaIntegrationTest.

I want to use the eventlistener as part of my test conditions.


Solution

  • You probably don't show the whole picture of your configuration, but that @EventListener has to be moved to the @Configuration class. For example, you can add a @TestConfiguration in addition to the rest of your Spring Boot configuration involved in this test suite.

    I also believe that @KafkaListener you show is not a part of that EmbeddedKafkaIntegrationTest. Since, as Martin, pointed out it is not supposed to work from a test class by itself. Just because the test class is not a bean for scanning.

    See more info in Spring Boot docs: https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.testing