Search code examples
integration-testingspring-kafka

Embedded kafka producer test


I'm writing integration tests to test kafka producer.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {kafkaProducerConfig.class, KafkaProducerIT.InnerConfig.class})
@EnableConfigurationProperties(KafkaProducerInfo.class)
@ComponentScan(basePackages = "...")
public class KafkaProducerIT {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "testtopic");

    @Autowired
    CustomKafkaProducer<String, String> KafkaProducer;

    @Autowired
    KafkaController kafkaController;

    @Test
    public void whenSendMessage_thenConsumeIt() throws InterruptedException {
        KafkaProducer.produceMessageToKafkaTopic("ahahahwow", "testtopic");
        kafkaController.countDownLatch.await();
    }

    @Configuration
    public static class InnerConfig {

        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, Object> replyConsumerFactory) {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(replyConsumerFactory);
            factory.setBatchListener(true);
            return factory;
        }

        @Bean
        KafkaController kafkaController() {
            return new KafkaController();
        }

    }

    public static class KafkaController {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        @KafkaListener(topics = "testtopic")
        public void listen(final String payload) {
            countDownLatch.countDown();
        }
    }

}

Idea is that I want to send message to topic, read it using KafkaController and CountDownLatch. Issue that I have is that CountDownLatch is never triggered and test just hangs on await.

CustomKafkaProducer is just a wrapper which uses regular kafkaTemplate under the hood.

p.s.

During debug, there were several cases when flow entered listener and test passed. So issue is not related to wrong topic names etc.


Solution

  • You need to set auto.offset.reset=earliest for the consumer. The default is latest so there is a race condition if the consumer starts after the record is sent.