Java & RabbitMQ here. I need to implement sort of a poison pill pattern where, upon handling a particular message, the consumer needs to cancel itself and stop receiving/handling any further messages. Full stop and clean up. The message kills the consumer and releases the thread, memory, etc.
I see consumers have a handleCancel
method that they can implement to respond to cancellation commands from the outside, but how do I handle a poison pill message inside a consumer that tells the consumer to fall over dead?
I don't think RabbitMQ handles this scenario for some reason.
My solution which appears to be working:
Processing
(default) and Terminating
Processing
state it consumes and handles messages off the queue like normal. When it receives the magical poison pill (perhaps a value in the message header/properties, or maybe a specific value in the message itself) it sets its status to Terminating
and does not process the message. It also uses an async event bus to send a custom ShutdownConsumerEvent
to an external handler. This event is instantiated with both the channel
and consumerTag
sent to the consumer (e.g. ShutdownConsumerEvent event = new ShutdownConsumerEvent(channel, consumerTag);
)Terminating
state get republished to the queue, with ACKs enabled so we don't lose them and have pseudo-transactionalityShutdownConsumerSubscriber
(a registered handler to receive ShutdownConsumerEvents
) receives the command to shut down the consumer, it does this by issuing a channel.basicCancel(consumerTag)