Search code examples
spring-bootapache-kafkaspring-kafkaspring-boot-testspring-kafka-test

How to test Kafka Consumer with @KafkaListener and Mockito?


I need to test Kafka Consumer. In the folder with the tests, I create application.property in which I describe the Producer.

Here are my Consumer settings in main folder:

spring.kafka.bootstrap-servers=localhost:29092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=info

My Consumer should read the message and save it to the database:

@Component("kafkaConsumer")
public class KafkaConsumerExchangeRate {

    private final ExchangeRepository repository;

    public KafkaConsumerExchangeRate(ExchangeRepository repository) {
        this.repository = repository;
    }

    @KafkaListener(
            topics = {"exchange-rate"},
            properties = {"spring.json.value.default.type = ru.company.service.exchange_rate.controller.dto.ListExchangeRatesDto"}
    )
    public void listenMessages(ConsumerRecord<String, ListExchangeRatesDto> record) {
        List<ExchangeOutgoingRateDto> exchangesRates = record.value().getExchangesRates();
        for (ExchangeOutgoingRateDto dto : exchangesRates) {
            ExchangeRate rate = repository
                    .findExchangeRateByCurrency1AndCurrency2(dto.getCurrency1(), dto.getCurrency2())
                    .orElseThrow();

            rate.setBuyingRate(dto.getBuyingRate());
            rate.setSellingRate(dto.getSellingRate());
            rate.setUpdatedAt(LocalDateTime.parse(dto.getUpdateAt()));

            repository.save(rate);
        }
    }
}

My test class:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
class KafkaConsumerExchangeRateTest {

    @Autowired
    private KafkaTemplate<String, ListExchangeRatesDto> template;

    @Mock
    private ExchangeRepository repository;

    @InjectMocks
    @Qualifier("kafkaConsumer")
    private KafkaConsumerExchangeRate consumer;

    @Test
    void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived()
            throws InterruptedException {

        //given
        ExchangeRate exchangeRate = ExchangeRate.builder()
                .buyingRate(BigDecimal.ONE)
                .sellingRate(BigDecimal.TEN)
                .updatedAt(LocalDateTime.now())
                .currency1("EUR")
                .currency2("USD")
                .build();

        ExchangeOutgoingRateDto dto = ExchangeOutgoingRateDto.builder()
                .buyingRate(BigDecimal.ONE)
                .sellingRate(BigDecimal.TEN)
                .updateAt(LocalDateTime.now().toString())
                .currency1("EUR")
                .currency2("USD")
                .build();

        ListExchangeRatesDto listOfDtos = ListExchangeRatesDto.builder()
                .exchangesRates(List.of(dto))
                .build();

        //and
        doReturn(Optional.of(exchangeRate)).when(repository)
                .findExchangeRateByCurrency1AndCurrency2(anyString(), anyString());

        //when
        template.send("exchange-rate", listOfDtos);
        Thread.sleep(1000);

        //then
        verify(repository, times(1)).save(any());
    }
}

I was hoping that I could calculate that the save method would work once. But I come across an exception:

Wanted but not invoked:
repository.save(<any>);
-> at ru.company.service.exchange_rate.kafka.KafkaConsumerExchangeRateTest.givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived(KafkaConsumerExchangeRateTest.java:84)
Actually, there were zero interactions with this mock.

I assume that the Consumer that is being created in the test and the Consumer that is actually being used are two different instances. Because the repository inside is not a mock and returns null when trying to get data.

How can I make it work?


Solution

  • You are mixing concerns which does not work together as is.

    The @Mock has a scope of only this test class. Then you use a KafkaTemplate from Spring application context to produce the testing data. The Spring knows nothing about your mock and just does the stuff against its beans, including your @KafkaListener.

    The Mockito does not interfere with Spring so easy.

    Instead you need to do:

    @MockBean
    private ExchangeRepository repository;
    

    And you don't need @InjectMocks. Spring Boot Testing framework will perform pre-processing for the application context to create mocks for those properties and replace respective beans in the application context. Then this mock is injected into your KafkaConsumerExchangeRate regular Spring way. And you are good to proceed with your stubs and verifications!