Search code examples
spring-kafka

Kafka Spring: How to write unit tests for ConcurrentKafkaListenerContainerFactory and ConcurrentMessageListenerContainer?


I have 2 classes; 1 for the factories and the other for listener containers:

public class ConsumerFactories() {
@Bean
  public ConcurrentKafkaListenerContainerFactory<String, Byte[]> adeKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Byte[]> factory = null;
    factory = new ConcurrentKafkaListenerContainerFactory<String, Byte[]>();
    factory.setConsumerFactory(consumerFactory1());
    factory.setConsumerFactory(consumerFactory2());
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }
}

And my listener class has multiple containers:

@Bean
  public ConcurrentMessageListenerContainer<String, byte[]> adeListenerContainer() throws BeansException, ClassNotFoundException {
    final ContainerProperties containerProperties =
        new ContainerProperties("topic1");
    containerProperties.setMessageListener(new MessageListener<String, byte[]>() {
      @Override
      public void onMessage(ConsumerRecord<String, byte[]> record) {
        System.out.println("Thread is: " + Thread.currentThread().getName());
      }
    });

    ConcurrentMessageListenerContainer<String, byte[]> container =
        new ConcurrentMessageListenerContainer<>(consumerFactory1, containerProperties);
    container.setBeanName("bean1");
    container.setConcurrency(60);
    container.start();
    return container;
  }


@Bean
  public ConcurrentMessageListenerContainer<String, byte[]> adeListenerContainer() throws BeansException, ClassNotFoundException {
    final ContainerProperties containerProperties =
        new ContainerProperties("topic1");
    containerProperties.setMessageListener(new MessageListener<String, byte[]>() {
      @Override
      public void onMessage(ConsumerRecord<String, byte[]> record) {
        System.out.println("Thread is: " + Thread.currentThread().getName());
      }
    });

    ConcurrentMessageListenerContainer<String, byte[]> container =
        new ConcurrentMessageListenerContainer<>(consumerFactory2, containerProperties);
    container.setBeanName("bean2");
    container.setConcurrency(60);
    container.start();
    return container;
  }

1) How can I write unit tests for these 2 classes and methods?

2) Since all my listener containers are doing the same processing work but for a different set of topics, can I pass the topics when I'm setting consumerFactory or any other way?


Solution

  • 1.

    container.start();

    Never start() components in bean definitions - the application context is not ready yet; the container will automatically start the containers at the right time (as long as autoStartup is true (default).

    1. Why do you need a container factory if you are creating the containers youself?

    It's not clear what you want to test.

    EDIT

    Here's an example of programmatically registering containers, using Spring Boot's auto-configured container factory (2.2 and above)...

    @SpringBootApplication
    public class So53752783Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So53752783Application.class, args);
        }
    
        @SuppressWarnings("unchecked")
        @Bean
        public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
                ConcurrentKafkaListenerContainerFactory<String, String> factory) {
            return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
                ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
                container.getContainerProperties().setMessageListener((MessageListener<String, String>) record -> {
                    System.out.println("Received " + record);
                });
                container.getContainerProperties().setGroupId(topic + ".group");
                container = (ConcurrentMessageListenerContainer<String, String>)
                        beanFactory.initializeBean(container, topic + ".container");
                beanFactory.registerSingleton(topic + ".container", container);
                container.start();
            });
        }
    
    }
    

    To unit test your listener,

    container.getContainerProperties().getMessagelistener()
    

    cast it and invoke onMessage() and verify it did what you expected.

    EDIT2 Unit Testing the listener

    @SpringBootApplication
    public class So53752783Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So53752783Application.class, args);
        }
    
        @SuppressWarnings("unchecked")
        @Bean
        public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
                ConcurrentKafkaListenerContainerFactory<String, String> factory,
                MyListener listener) {
    
            return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
                ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
                container.getContainerProperties().setMessageListener(listener);
                container.getContainerProperties().setGroupId(topic + ".group");
                container = (ConcurrentMessageListenerContainer<String, String>)
                        beanFactory.initializeBean(container, topic + ".container");
                beanFactory.registerSingleton(topic + ".container", container);
                container.start();
            });
        }
    
        @Bean
        public MyListener listener() {
            return new MyListener();
        }
    
        public static class MyListener implements MessageListener<String, String> {
    
            @Autowired
            private Service service;
    
            public void setService(Service service) {
                this.service = service;
            }
    
    
            @Override
            public void onMessage(ConsumerRecord<String, String> data) {
                this.service.callSomeService(data.value().toUpperCase());
            }
    
        }
    
        public interface Service {
    
            void callSomeService(String in);
    
        }
    
        @Component
        public static class DefaultService implements Service {
    
            @Override
            public void callSomeService(String in) {
                // ...
            }
    
        }
    
     }
    

    and

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class So53752783ApplicationTests {
    
        @Autowired
        private ApplicationContext context;
    
        @Test
        public void test() {
            ConcurrentMessageListenerContainer<?, ?> container = context.getBean("foo.container",
                    ConcurrentMessageListenerContainer.class);
            MyListener messageListener = (MyListener) container.getContainerProperties().getMessageListener();
            Service service = mock(Service.class);
            messageListener.setService(service);
            messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, "key", "foo"));
            verify(service).callSomeService("FOO");
        }
    
    }