Search code examples

Testing Kafka consumer @KafkaListener with EmbeddedKafka in Spring Boot

I want to test my kafka consumer, but there is in issue with @EmbddedKafka.

public class KafkaEventConsumer {

    private final CustomInterface customInterface;

    @KafkaListener(topics = "test-topic")
    public void consumeEvents(Event event) {

My test class is as the following

@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
class KafkaConsumerTest {

    private Producer<String, String> producer;

    private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",

    private KafkaEventConsumer kafkaEventConsumer;

    private CustomInterface CustomInterface;

    private EmbeddedKafkaBroker embeddedKafkaBroker;

    private ObjectMapper objectMapper;


    void setUp() {
        kafkaEventConsumer = new KafkaEventConsumer(customInterface);
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(),
                                                     new StringSerializer()).createProducer();

    void consumeEvents() throws IOException, BadCurrencyException {
        var event = objectMapper.readValue(EVENT_JSON,Event.class);
        String message = objectMapper.writeValueAsString(event);
        producer.send(new ProducerRecord<>("test-topic", 0, "1", message));

        // Read the message and assert its properties
        verify(customeInterface, timeout(10000).times(1)).apply(any());

    void shutdown() {


The test doesn't pass, the consumer didn't intercept the message

Wanted but not invoked:
     <any> );
Actually, there were zero interactions with this mock.

PS: I followed this interesting article


  • I used KafkaTemplate

    @EmbeddedKafka(brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
                   partitions = 1,
                   controlledShutdown = true)
    class KafkaConsumerTest {
        private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
        KafkaTemplate<String, Event> kafkaTemplate;
        private ObjectMapper objectMapper;
        private KafkaEvenConsumer kafkaEvenConsumer;
        private MyInterface myInterface;
        ArgumentCaptor<Event> eventCaptor;
        void consumeEvents() {
            Event event = objectMapper.readValue(EVENT_JSON,                                                Event.class);
            kafkaTemplate.send("test-topic, "1", event);
            Event argument = eventCaptor.getValue();
            // .. assert the message properties
            verify(myInterface, timeout(10000).times(1)).apply(any());