Search code examples
javajunitmockitospring-kafkaspring-kafka-test

Unit testing MessageListener class


How do you unit test a class that implements the spring-kafka MessageListener interface? I have a listener class that I am manually listening to topics with a onMessage function. This function is very simple and just receives the messages.

My setup is with Spring 5.8, Spring-Kafka 2.2.7, Spring-Kafka-Test, JUnit, and WITHOUT spring boot.

I have been trying a bunch of different examples from the Spring reference docs and other posts but none seem to show a simple way to test the Listener class that implements MessageListener.

I am not sure if I need to set up an EmbeddedKafkaBroker or EmbeddedKafkaRule or is there a different way to test. When I tried using EmbeddedKafkaRule I get an error that says NoClassDefFound.

However I don't understand how this test case would hit my onMessage function.

@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class listenerTest {

    private String topic = "someTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, topic);

    private CountDownLatch countDownLatch;

    @Before
    public void setUpTests (){
        Map<String, Object> sProps = KafkaTestUtils.senderProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString());

        ProducerFactory producer = new DefaultKafkaProducerFactory<String, String> (sProps);

        kafkaTemplate = new KafkaTemplate<>(producer);

        kafkaTemplate.setDefaultTopic(topic);

        countDownLatch = new CountDownLatch (1);

    }

    @Test
    public void testReceiver(){
         kafkaTemplate.sendDefault("message");
         assertEquals(latch.getCount(), 0);
    }

Class I want to Unit Test

public class listener implements BatchAcknowledgingMessageListener<String, String>{

    private CallbackInterface callback;

    public listener(CallbackInterface callback){
        this.callbackI = callback;
    }

    @Override
    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
         this.callbackI.handleMessage();
         ack.acknowledge();
    }
}

This throws a weird error that says this... NoClassDefFound


Solution

  • For a pure unit test, you don't need an embedded broker, you should just call the listener directly.

    Inject a mock callback and verify it was called properly.

    When I tried to directly call the onMessage function to test it I get an error that says Container should not be calling function onMessage.

    You are calling the wrong onMessage...

    public interface BatchMessageListener extends MessageListener {
    
        @Override
        default void onMessage(Message message) {
            throw new UnsupportedOperationException("Should never be called by the container");
        }
    
        @Override
        void onMessageBatch(List<Message> messages);
    
    }
    

    EDIT

    public class MyListener implements BatchAcknowledgingMessageListener<String, String> {
    
        private final MyService service;
    
        public MyListener(MyService service) {
            this.service = service;
        }
    
        @Override
        public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment) {
            data.forEach(dat -> this.service.call(dat.value()));
            acknowledgment.acknowledge();
        }
    
        public interface MyService {
    
            void call(String toCall);
    
        }
    
    }
    

    and

    class So57192362ApplicationTests {
    
        @Test
        void test() {
            MyService service = mock(MyService.class);
            MyListener listener = new MyListener(service);
            Acknowledgment acknowledgment = mock(Acknowledgment.class);
            listener.onMessage(Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, null, "bar")), acknowledgment);
            verify(service).call("bar");
            verify(acknowledgment).acknowledge();
            verifyNoMoreInteractions(service, acknowledgment);
        }
    
    }