I have a Spring Boot project that has a Kafka listener that I want to test using Embedded Kafka. I have the Kafka Listener log out the message "record received". Which will only be be logged out if I add a Thread.sleep(1000)
to the start of the method.
Test class:
@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {
private static final String TOPIC = "my-topic";
@Autowired
EmbeddedKafkaBroker kafkaBroker;
@Test
void testSendEvent() throws ExecutionException, InterruptedException {
// Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
Producer<Integer, String> producer = configureProducer();
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
producer.send(producerRecord).get();
producer.close();
}
private Producer<Integer, String> configureProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
}
}
I don't want to use the fickle Thread.sleep()
The test is obviously executing before some setup processes have completed. I clearly need to wait on something, but I am not sure what nor how to do it.
Using:
Add an @EventListener
bean to the test context and (for example) count down a CountDownLatch
when a ConsumerStartedEvent
is received; then in the test
assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#events
and
https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumption
Or add a ConsumerRebalanceListener
and wait for partition assignment.