Search code examples
spring-kafkaspring-cloud-streamspring-kafka-test

Can't use a @KafkaListener more than once in a Test


We are trying to test a cloud-stream-kafka application, and in the test we have multiple test methods sending messages, and a single @KafkaListener waiting for the response.

However, the first test tends to pass, and the second test tends to fail.

Any pointers would be appreciated.

@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@EmbeddedKafka(topics = "input", partitions = 1)
@DirtiesContext
class EmbeddedKafkaListenerTest {

  private CountDownLatch latch;

  private String message;

  @BeforeEach
  void setUp() {
    this.message = null;
    this.latch = new CountDownLatch(1);
  }

  @Test
  void testSendFirstMessage(@Autowired KafkaTemplate<String, byte[]> template)
      throws InterruptedException {
    template.send("input", "Hello World 1".getBytes());
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    assertEquals("Hello World 1", message);
  }

  @Test
  void testSendSecondMessage(@Autowired KafkaTemplate<String, byte[]> template)
      throws InterruptedException {
    template.send("input", "Hello World 2".getBytes());
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    assertEquals("Hello World 2", message);
  }

  @KafkaListener(topics = "input", id = "kafka-listener-consumer")
  void listener(Message<byte[]> message) {
    this.message = new String(message.getPayload(), StandardCharsets.UTF_8);
    this.latch.countDown();
  }
}

It seems an instance of the @KafkaListener is being registered for each test , since we notice that using the id value results in a java.lang.IllegalStateException: Another endpoint is already registered with id 'kafka-listener-consumer'

I had done similar tests using @RabbitListener when the messaging framework in use was RabbitMQ. I was hoping I could do something similar since some of the test cases involve waiting for no message to be published, and we could do that with an assertFalse(latch.await(10, TimeUnit.SECONDS))


Solution

  • I think your @KafkaListener method should go into the @Configuration class, otherwise it is indeed that EmbeddedKafkaListenerTest is instantiated per test method and, therefore @KafkaListener is parsed as much as you have test methods.

    Another way is to use @DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD), so you will have not only fresh EmbeddedKafkaListenerTest instance for each test method, but also Spring ApplicationContext.

    And also there is a way to use @TestInstance(TestInstance.Lifecycle.PER_CLASS), so you will have a single EmbeddedKafkaListenerTest for the whole test suit and your @KafkaListener won't be parsed several times.