Search code examples
spring-bootmockitospring-kafka

Testing Kafka consumer @KafkaListener with EmbeddedKafka in Spring Boot


I want to test my kafka consumer, but there is in issue with @EmbddedKafka.

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaEventConsumer {

    private final CustomInterface customInterface;

    @KafkaListener(topics = "test-topic")
    public void consumeEvents(Event event) {
           customInterface.apply(event);
        }
    }
}

My test class is as the following

@EmbeddedKafka
@ExtendWith(MockitoExtension.class)
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaConsumerTest {

    private Producer<String, String> producer;

    private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
                                                                "event-file.json").toFile();

    private KafkaEventConsumer kafkaEventConsumer;

    @Mock
    private CustomInterface CustomInterface;


    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private ObjectMapper objectMapper;

  

    @BeforeAll
    void setUp() {
        kafkaEventConsumer = new KafkaEventConsumer(customInterface);
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(),
                                                     new StringSerializer()).createProducer();
    }

    @Test
    void consumeEvents() throws IOException, BadCurrencyException {
        var event = objectMapper.readValue(EVENT_JSON,Event.class);
        String message = objectMapper.writeValueAsString(event);
        producer.send(new ProducerRecord<>("test-topic", 0, "1", message));
        producer.flush();

        // Read the message and assert its properties
        verify(customeInterface, timeout(10000).times(1)).apply(any());
    }

    @AfterAll
    void shutdown() {
        producer.close();
    }

}

The test doesn't pass, the consumer didn't intercept the message

Wanted but not invoked:
customInterface.apply(
     <any> );
Actually, there were zero interactions with this mock.

PS: I followed this interesting article


Solution

  • I used KafkaTemplate

    @SpringBootTest
    @EmbeddedKafka(brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
                   partitions = 1,
                   controlledShutdown = true)
    class KafkaConsumerTest {
    
        private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
                                                                    "event-file.json").toFile();
        @Autowired
        KafkaTemplate<String, Event> kafkaTemplate;
    
        @Autowired
        private ObjectMapper objectMapper;
    
    
        @SpyBean
        private KafkaEvenConsumer kafkaEvenConsumer;
    
        @SpyBean
        private MyInterface myInterface;
    
        @Captor
        ArgumentCaptor<Event> eventCaptor;
    
        @Test
        @SneakyThrows
        @DirtiesContext
        void consumeEvents() {
    
            Event event = objectMapper.readValue(EVENT_JSON,                                                Event.class);
            kafkaTemplate.send("test-topic, "1", event);
    
            verify(kafkaEventConsumer,timeout(10000).times(1)).consumeEvents(eventCaptor.capture());
            Event argument = eventCaptor.getValue();
            // .. assert the message properties
            verify(myInterface, timeout(10000).times(1)).apply(any());
           
        }
    
    }