We are trying to test a cloud-stream-kafka application, and in the test we have multiple test methods sending messages, and a single @KafkaListener waiting for the response.
However, the first test tends to pass, and the second test tends to fail.
Any pointers would be appreciated.
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@EmbeddedKafka(topics = "input", partitions = 1)
@DirtiesContext
class EmbeddedKafkaListenerTest {
private CountDownLatch latch;
private String message;
@BeforeEach
void setUp() {
this.message = null;
this.latch = new CountDownLatch(1);
}
@Test
void testSendFirstMessage(@Autowired KafkaTemplate<String, byte[]> template)
throws InterruptedException {
template.send("input", "Hello World 1".getBytes());
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals("Hello World 1", message);
}
@Test
void testSendSecondMessage(@Autowired KafkaTemplate<String, byte[]> template)
throws InterruptedException {
template.send("input", "Hello World 2".getBytes());
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals("Hello World 2", message);
}
@KafkaListener(topics = "input", id = "kafka-listener-consumer")
void listener(Message<byte[]> message) {
this.message = new String(message.getPayload(), StandardCharsets.UTF_8);
this.latch.countDown();
}
}
It seems an instance of the @KafkaListener
is being registered for each test , since we notice that using the id
value results in a java.lang.IllegalStateException: Another endpoint is already registered with id 'kafka-listener-consumer'
I had done similar tests using @RabbitListener
when the messaging framework in use was RabbitMQ. I was hoping I could do something similar since some of the test cases involve waiting for no message to be published, and we could do that with an assertFalse(latch.await(10, TimeUnit.SECONDS))
I think your @KafkaListener
method should go into the @Configuration
class, otherwise it is indeed that EmbeddedKafkaListenerTest
is instantiated per test method and, therefore @KafkaListener
is parsed as much as you have test methods.
Another way is to use @DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
, so you will have not only fresh EmbeddedKafkaListenerTest
instance for each test method, but also Spring ApplicationContext
.
And also there is a way to use @TestInstance(TestInstance.Lifecycle.PER_CLASS)
, so you will have a single EmbeddedKafkaListenerTest
for the whole test suit and your @KafkaListener
won't be parsed several times.