Search code examples
javarabbitmqamqpspring-amqp

How to create @RabbitListener to be idempotent


Our configuration is: 1...n Message receivers with a shared database. Messages should only be processed once.

   @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "message-queue", durable = "true"),
            exchange = @Exchange(value = TOPIC_EXCHANGE, type = "topic", durable = "true"),
            key = MESSAGE_QUEUE1_RK)
    )
    public void receiveMessage(CustomMessage message) throws InterruptedException {
        System.out.println("I have been received = " + message);
    }

We want to to guarantee messages will be processed once, we have a message store with id's of messages already processed. Is it possible to hook in this check before receiveMessage? We tried to look at a MessagePostProcessor with a rabbitTemplate but didn't seem to work.

any advice on how to do this? We tried with a MethodInterceptor and this works, but is pretty ugly. Thanks


Solution found - thanks to Gary I created a MessagePostProcessorInjector which implements SmartLifecycle and on startup, I inspect each container and if it is a AbstractMessageListenerContainer add a customer MessagePostProccesser and a custom ErrorHandler which looks for certain type of Exceptions and drops them (other forward to defaultErrorHandler) Since we are using DLQ I found throwing exceptions or setting to null wouldn't really work.

I'll make a pull request to ignore null Messages after a MPP.


Solution

  • Interesting; the SimpleMessageListenerContainer does have a property afterReceivePostProcessors (not currently available via the listener container factory used by the annotation, but it could be injected later).

    However, those postprocessors won't help because we still invoke the listener.

    Please feel free to open a JIRA Improvement Issue for two things:

    1. expose the afterReceivePostProcessors in the listener container factories
    2. if a post processor returns null, skip calling the listener method.

    (correction, the property is indeed exposed by the factory).

    EDIT

    How it works...

    During context initialization...

    1. For each annotation detected by the bean post processor the container is created and registered in the RabbitListenerEndpointRegistry
    2. Near the end of context initialization, the registry is start()ed and it starts all containers that are configured for autoStartup (default).

    To do further configuration of the container before it's started (e.g. for properties not currently exposed by the container factories), set autoStartup to false.

    You can then get the container(s) from the registry (either as a collection or by id). Simply @Autowire the registry in your app.

    Cast the container to a SimpleMessageListenerContainer (or alternatively a DirectMessageListenerContainer if using Spring AMQP 2.0 or later and you are using its factory instead).

    Set the additional properties (such as the afterReceiveMessagePostProcessors); then start() the container.

    Note: until we enhance the container to allow MPPs that return null, a possible alternative is to throw an AmqpRejectAndDontRequeueException from the MPP. However, this is probably not what you want if you have DLQs configured.