Search code examples
javaspringrabbitmqspring-bootspring-amqp

How to configure and receiveAndConvert jSON payload into domain Object in Spring Boot and RabbitMQ


Recently I have been having a keen interest on Microservice Architecture using Spring Boot. My implementation has two Spring boot applications;

Application One receives requests from a RESTful API, converts and sends jSON payload to a RabbitMQ queueA.

Application Two, has subscribed to queueA, receives the jSON payload(Domain Object User) and is supposed to activate a service within Application Two eg. send email to a user.

Using no XML in my Application Two configuration, how do I configure a converter that will convert the jSON payload received from RabbitMQ into a Domain Object User.

Below are snippets from Spring Boot configurations on Application Two

Application.class

@SpringBootApplication
@EnableRabbit
public class ApplicationInitializer implements CommandLineRunner {

    final static String queueName = "user-registration";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    AnnotationConfigApplicationContext context;

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("user-registrations");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    public static void main(String[] args) {
        SpringApplication.run(ApplicationInitializer.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Waiting for messages...");
    }

}

TestService.java

@Component
public class TestService {

    /**
     * This test verifies whether this consumer receives message off the user-registration queue
     */
    @RabbitListener(queues = "user-registration")
    public void testReceiveNewUserNotificationMessage(User user) {
        // do something like, convert payload to domain object user and send email to this user
    }

}

Solution

  • I had the same problem and after some research and testing I learned, that there is more than one way to configure your RabbitMQ-Receiver in SpringBoot, but it is important to choose one and stick with that.

    If you decide to go with the Annotation Driven Listener Endpoint, what I derive from your usage of @EnableRabbit and @RabbitListener, than the configuration you posted didn´t work for me. What worked is the following:

    Derive your Configuration Class from org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer and override the Method configureRabbitListeners as follows:

     @Override
    public void configureRabbitListeners(
            RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }
    

    and add a MessageHandlerFactory:

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }
    

    Additionally you need to define SimpleRabbitListenerContainerFactory (as you already did) and Autowire the corresponding ConnectionFactory:

    @Autowired
    public ConnectionFactory connectionFactory;
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
    

    Finishing your Configuration, you need to define the Bean, which handles your Messages and inherits the @RabbitListerner-Annotations. For me I named that EventResultHandler (you named it TestService):

        @Bean
    public EventResultHandler eventResultHandler() {
        return new EventResultHandler();
    }
    

    Then in your EventResultHandler (or TestService) you define the @RabbitListener-Methods with their corresponding Queues and the Payload (= the POJO, where your JSON-Message is serialized to):

    @Component
    public class EventResultHandler {
    
        @RabbitListener(queues=Queues.QUEUE_NAME_PRESENTATION_SERVICE)
        public void handleMessage(@Payload Event event) {
            System.out.println("Event received");
            System.out.println("EventType: " + event.getType().getText());
        }
    }
    

    I ommited the needed definition and binding of Queues and Exchanges - you can do that either in one or in another Microservice - or in RabbitMQ-Server manually... But you for sure have to do that.