So I'm using NestJS (v8) with the RabbitMQ transport (Transport.RMQ
) to listen for messages
My NestJS code look something like this:
// main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'my-queue',
replyQueue: 'my-reply-queue'
},
});
// my.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Controller()
export class MyController {
@MessagePattern('something')
do(data: {source: string}): {source: string} {
console.log(data);
data.source += ' | MyController';
return data;
}
}
And in Node.JS application, I use amqplib
to send to the NestJS application and receive the response
this is the code of the Node.JS application:
const queueName = 'my-queue';
const replyQueueName = 'my-reply-queue';
const amqplib = require('amqplib');
async function run() {
const conn = await amqplib.connect('amqp://localhost:5672');
const channel = await conn.createChannel();
await channel.assertQueue(queueName);
await channel.assertQueue(replyQueueName);
// Consumer: Listen to messages from the reply queue
await channel.consume(replyQueueName, (msg) => console.log(msg.content.toString()));
// Publisher: Send message to the queue
channel.sendToQueue(
queueName,
Buffer.from(
JSON.stringify({
pattern: 'something',
data: { source: 'node-application' },
})
),
{ replyTo: replyQueueName }
);
}
run()
When I run the node and the Nest.JS applications, the Nest.JS gets the message from the Node.JS publisher but the Node.JS consumer is never called with the reply
The fix was to add an id
key in the data that the Node.JS application sends:
// ...
// Publisher: Send message to the queue
channel.sendToQueue(
queueName,
Buffer.from(
JSON.stringify({
// Add the `id` key here so the Node.js consumer will get the message in the reply queue
id: '',
pattern: 'something',
data: { source: 'node-application' },
})
),
{ replyTo: replyQueueName }
);
// ...
This is because in the handleMessage
function in server-rmq.ts
file there is a check if id
property of the message is undefined
// https://github.com/nestjs/nest/blob/026c1bd61c561a3ad24da425d6bca27d47567bfd/packages/microservices/server/server-rmq.ts#L139-L141
public async handleMessage(
message: Record<string, any>,
channel: any,
): Promise<void> {
// ...
if (isUndefined((packet as IncomingRequest).id)) {
return this.handleEvent(pattern, packet, rmqContext);
}
// ...
}
And there is no logic of sending messages to the reply queue in the handleEvent
function, just handling the event