Search code examples
javaspring-bootapache-kafkaspring-kafkaembedded-kafka

Embedded Kafka Spring test executes before embedded Kafka is ready


I have a Spring Boot project that has a Kafka listener that I want to test using Embedded Kafka. I have the Kafka Listener log out the message "record received". Which will only be be logged out if I add a Thread.sleep(1000) to the start of the method.

Test class:

@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {

    private static final String TOPIC = "my-topic";

    @Autowired
    EmbeddedKafkaBroker kafkaBroker;

    @Test
    void testSendEvent() throws ExecutionException, InterruptedException {
        // Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
        Producer<Integer, String> producer = configureProducer();
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
        producer.send(producerRecord).get();
        producer.close();
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}

I don't want to use the fickle Thread.sleep() The test is obviously executing before some setup processes have completed. I clearly need to wait on something, but I am not sure what nor how to do it.

Using:

  • Java 11
  • Spring Boot 2.5.6
  • JUnit 5
  • spring-kafka-test 2.7.8

Solution

  • Add an @EventListener bean to the test context and (for example) count down a CountDownLatch when a ConsumerStartedEvent is received; then in the test

    assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();
    

    See https://docs.spring.io/spring-kafka/docs/current/reference/html/#events

    and

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumption

    Or add a ConsumerRebalanceListener and wait for partition assignment.