Search code examples
spring-amqpspring-rabbit

How do we hook into before/After message processing using @RabbitListener


Problem: I am migrating from MessageListener interface impl to @RabbitListener. I had logic like this where I was doing "pre" and "post" message processing on a MessageListener that was inherited by several classes

example:

public AbstractMessageListener implements MessageListener {

     @Override
     public void onMessage(Message message) {

          //do some pre message processing

          process(Message message);

          // do some post message processing
     }

     protected abstract void process(Message message);

}

Question: Is there a way I can achieve something similar using @RabbitListener annotation Where I can inherit pre/post message processing logic without having to re-implement or call the pre/post message processing inside each child @RabbitListener annotation and all the while maintaining a customizable method signatures for the child @RabbitListener? Or is this being too greedy?

Example desired result:

public class SomeRabbitListenerClass {

    @RabbitListener( id = "listener.mypojo",queues = "${rabbitmq.some.queue}")
   public void listen(@Valid MyPojo myPojo) {
      //...
   }
}

public class SomeOtherRabbitListenerClass {

    @RabbitListener(id = "listener.orders",queues ="${rabbitmq.some.other.queue}")
   public void listen(Order order, @Header("order_type") String orderType) {
      //...
   }
}

with both these @RabbitListener(s) utilizing the same inherited pre/post message processing

I see there is a 'containerFactory' argument in the @RabbitListener annotation but i'm already declaring one in the config... and i'm really sure how to achieve the inheritance I desire with a custom containerFactory.


Updated Answer: This is what I ended up doing.

Advice defintion:

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;

/**
 * AOP Around advice wrapper. Every time a message comes in we can do 
 * pre/post processing by using this advice by implementing the before/after methods.
 * @author sjacobs
 *
 */
public class RabbitListenerAroundAdvice implements MethodInterceptor {

    /**
     * place the "AroundAdvice" around each new message being processed.
     */
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {

        Message message = (Message) invocation.getArguments()[1];

        before(message)
        Object result = invocation.proceed();
        after(message);

        return  result;
    }

declare beans: In your rabbitmq config declare the advice as a Spring bean and pass it to the rabbitListenerContainerFactory#setAdviceChain(...)

//...

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory( cachingConnectionFactory() );
        factory.setTaskExecutor(threadPoolTaskExecutor());
        factory.setMessageConverter(jackson2JsonMessageConverter());   

        factory.setAdviceChain(rabbitListenerAroundAdvice());

        return factory;
    }

    @Bean
    public RabbitListenerAroundAdvice rabbitListenerAroundAdvice() {
        return new RabbitListenerAroundAdvice();
    }

// ...

Solution

  • Correction

    You can use the advice chain in the SimpleRabbitListenerContainerFactory to apply an around advice to listeners created for @RabbitListener; the two arguments are the Channel and Message.

    If you only need to take action before calling the listener, you can add MessagePostProcessor(s) to the container afterReceivePostProcessors.