Our configuration is: 1...n Message receivers with a shared database. Messages should only be processed once.
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "message-queue", durable = "true"),
exchange = @Exchange(value = TOPIC_EXCHANGE, type = "topic", durable = "true"),
key = MESSAGE_QUEUE1_RK)
)
public void receiveMessage(CustomMessage message) throws InterruptedException {
System.out.println("I have been received = " + message);
}
We want to to guarantee messages will be processed once, we have a message store with id's of messages already processed. Is it possible to hook in this check before receiveMessage? We tried to look at a MessagePostProcessor with a rabbitTemplate but didn't seem to work.
any advice on how to do this? We tried with a MethodInterceptor and this works, but is pretty ugly. Thanks
Solution found - thanks to Gary
I created a MessagePostProcessorInjector
which implements SmartLifecycle
and on startup, I inspect each container and if it is a AbstractMessageListenerContainer
add a customer MessagePostProccesser
and a custom ErrorHandler
which looks for certain type of Exceptions and drops them (other forward to defaultErrorHandler)
Since we are using DLQ I found throwing exceptions or setting to null wouldn't really work.
I'll make a pull request to ignore null Messages after a MPP.
Interesting; the SimpleMessageListenerContainer
does have a property afterReceivePostProcessors
(not currently available via the listener container factory used by the annotation, but it could be injected later).
However, those postprocessors won't help because we still invoke the listener.
Please feel free to open a JIRA Improvement Issue for two things:
afterReceivePostProcessors
in the listener container factories(correction, the property is indeed exposed by the factory).
EDIT
How it works...
During context initialization...
RabbitListenerEndpointRegistry
start()
ed and it starts all containers that are configured for autoStartup
(default).To do further configuration of the container before it's started (e.g. for properties not currently exposed by the container factories), set autoStartup
to false
.
You can then get the container(s) from the registry (either as a collection or by id
). Simply @Autowire
the registry in your app.
Cast the container to a SimpleMessageListenerContainer
(or alternatively a DirectMessageListenerContainer
if using Spring AMQP 2.0 or later and you are using its factory instead).
Set the additional properties (such as the afterReceiveMessagePostProcessors
); then start()
the container.
Note: until we enhance the container to allow MPPs that return null
, a possible alternative is to throw an AmqpRejectAndDontRequeueException
from the MPP. However, this is probably not what you want if you have DLQs configured.