Search code examples
node.jsrabbitmqnestjs

Not receiving messages in the reply queue that sent to RabbitMQ using amqplib and processed by NestJS


So I'm using NestJS (v8) with the RabbitMQ transport (Transport.RMQ) to listen for messages

My NestJS code look something like this:

// main.ts

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://localhost:5672'],
    queue: 'my-queue',
    replyQueue: 'my-reply-queue'
  },
});
// my.controller.ts

import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class MyController {
  @MessagePattern('something')
  do(data: {source: string}): {source: string} {
    console.log(data);

    data.source += ' | MyController';

    return data;
  }
}

And in Node.JS application, I use amqplib to send to the NestJS application and receive the response

this is the code of the Node.JS application:

const queueName = 'my-queue';
const replyQueueName = 'my-reply-queue';

const amqplib = require('amqplib');

async function run() {
  const conn = await amqplib.connect('amqp://localhost:5672');
  const channel = await conn.createChannel();

  await channel.assertQueue(queueName);
  await channel.assertQueue(replyQueueName);

  // Consumer: Listen to messages from the reply queue
  await channel.consume(replyQueueName, (msg) => console.log(msg.content.toString()));

  // Publisher: Send message to the queue
  channel.sendToQueue(
    queueName,
    Buffer.from(
      JSON.stringify({
        pattern: 'something',
        data: { source: 'node-application' },
      })
    ),
    { replyTo: replyQueueName }
  ); 
}

run()

When I run the node and the Nest.JS applications, the Nest.JS gets the message from the Node.JS publisher but the Node.JS consumer is never called with the reply


Solution

  • The fix was to add an id key in the data that the Node.JS application sends:

    // ...
    
    // Publisher: Send message to the queue
    channel.sendToQueue(
      queueName,
      Buffer.from(
        JSON.stringify({
          // Add the `id` key here so the Node.js consumer will get the message in the reply queue
          id: '',
          
          pattern: 'something',
          data: { source: 'node-application' },
        })
      ),
      { replyTo: replyQueueName }
    ); 
    
    // ...
    

    Detailed explanation (in Nest.JS source code)

    This is because in the handleMessage function in server-rmq.ts file there is a check if id property of the message is undefined

    // https://github.com/nestjs/nest/blob/026c1bd61c561a3ad24da425d6bca27d47567bfd/packages/microservices/server/server-rmq.ts#L139-L141
    
     public async handleMessage(
        message: Record<string, any>,
        channel: any,
      ): Promise<void> {
        // ...
    
        if (isUndefined((packet as IncomingRequest).id)) {
          return this.handleEvent(pattern, packet, rmqContext);
        }
    
        // ...
      }
    

    And there is no logic of sending messages to the reply queue in the handleEvent function, just handling the event