I'm using grails 3.2.3
version and rabbitmq native plugin 3.3.2
(http://budjb.github.io/grails-rabbitmq-native/doc/manual/). I am trying to achieve the following scenario.
Description: I'm sending multiple messages to a single queue with headers and On the consumer section I tried to apply binding to consume message by specific filtering. But the consumer consumes all messages regardless of filtering - means the binding is not working. Also I'm starter on rabbitmq. So any help/direction is much appreciated. Below is my code.
Queue config in application.groovy:
rabbitmq {
queues = [
[
name : "mail.queue",
connection: "defaultConnection",
durable : true
]
]
}
Sending to queue function:
protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) {
rabbitMessagePublisher.send {
routingKey = queueType.queueName
body = message
autoConvert = true
if (headers != null) {
headers = binding
}
}
}
Here on sendToQueue
I made the third parameter optional as I won't need multiple types of consumers in some cases;
Calling send to queue:
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()])
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()])
Consumer 1:
static rabbitConfig = [
queue : QueueType.EMAIL_QUEUE.queueName,
binding : ["emailType": EmailType.PASSWORD_RESET.name()],
match : "all",
consumer: 10
]
def handleMessage(Map message, MessageContext context) {
print("From PasswordResetEmailConsumer consumer")
println(message)
passwordResetEmailService.sendPasswordResetMail(message)
}
Consumer 2:
static rabbitConfig = [
queue : QueueType.EMAIL_QUEUE.queueName,
binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()],
match : "all",
consumer: 10
]
def handleMessage(Map message, MessageContext context) {
print("From PasswordResetSuccessEmailConsumer consumer")
println(message)
passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message)
}
After reading the rabbitmq documentation I realized that it is not possible to selectively pull messages from a single queue.
Consumer receives all messages from the queue
Although there is another option "Exchange"
where publisher will publish message to exchange with a routing key and those messages will be delivered to the bound queues. More: RabbitMQ Publish/Subscribe Model
The basic idea is also described here: Stackoverflow: RabbitMQ selectively retrieving messages from a queue
Anyway, in my solution I didn't want multiple queues. So I created a single consumer and pass actual handler class bean reference with message to dispatch the message. Sharing the implementation, hope this helps someone:
Queue config in application.groovy:
rabbitmq {
queues = [
[
name : "mail.queue",
connection: "defaultConnection",
durable : true
]
]
}
Sending to queue function:
protected void sendToQueue(Map message, QueueType queueType, Class<BaseQueueHandler> queueHandlerServiceClass) {
message.queueHandlerServiceClass = queueHandlerServiceClass.name
rabbitMessagePublisher.send {
routingKey = queueType.queueName // queue name from enum: "mail.queue"
body = message
autoConvert = true
}
}
Handler Interface:
interface BaseQueueHandler {
void handleMessage(Map message, MessageContext context)
}
Sending to queue:
sendToQueue([user: user], QueueType.EMAIL_QUEUE, PasswordResetEmailService.class)
Queue Consumer:
class EchoEmailQueueConsumer {
static rabbitConfig = [
queue : QueueType.ECHO_EMAIL_QUEUE.queueName,
consumer: 10
]
GrailsApplication grailsApplication
def handleMessage(Map message, MessageContext context) {
String handlerClass = message.remove("queueHandlerServiceClass")
Class<BaseQueueHandler> handlerClassType = Class.forName(handlerClass);
BaseQueueHandler queueService = grailsApplication.mainContext.getBean(handlerClassType)
queueService.handleMessage(message, context)
}
}
Finally Handler service which implements Handler interface:
class PasswordResetEmailService implements BaseQueueHandler {
@Override
void handleMessage(Map message, MessageContext context) {
println("message received in PasswordResetEmailService")
}
}