Search code examples
node.jstypescriptmicroservicesmqttnestjs

What's a valid @MessagePattern for NestJS MQTT microservice?


I'm trying to setup a MQTT Microservice using NestJS according to the docs.

I've started a working Mosquitto Broker using Docker and verified it's operability using various MQTT clients. Now, when I start the NestJS service it seems to be connecting correctly (mqqt.fx shows new client), yet I am unable to receive any messages in my controllers. This is my bootstrapping, just like in the docs:

main.ts

async function bootstrap() {
    const app = await NestFactory.createMicroservice(AppModule, {
        transport: Transport.MQTT,
        options: {
            host: 'localhost',
            port: 1883,
            protocol: 'tcp'
        }
    });
    app.listen(() => console.log('Microservice is listening'));
}
bootstrap();

app.controller.ts

@Controller()
export class AppController {

    @MessagePattern('mytopic') // tried {cmd:'mytopic'} or {topic:'mytopic'}
    root(msg: Buffer) {
        console.log('received: ', msg)
    }
}

Am I using the message-pattern decorator wrongly or is my concept wrong of what a NestJS MQTT microservice even is supposed to do? I thought it might subscribe to the topic I pass to the decorator. My only other source of information being the corresponding unit tests


Solution

  • nest.js Pattern Handler

    On nest.js side we have the following pattern handler:

    @MessagePattern('sum')
    sum(data: number[]): number {
      return data.reduce((a, b) => a + b, 0);
    }
    

    As @Alexandre explained, this will actually listen to sum_ack.


    Non-nest.js Client

    A non-nest.js client could look like this (just save as client.js, run npm install mqtt and run the program with node client.js):

    var mqtt = require('mqtt')
    var client  = mqtt.connect('mqtt://localhost:1883')
    
    client.on('connect', function () {
      client.subscribe('sum_res', function (err) {
        if (!err) {
          client.publish('sum_ack', '{"data": [2, 3]}');
        }
      })
    })
    
    client.on('message', function (topic, message) {
      console.log(message.toString())
      client.end()
    })
    

    It sends a message on the topic sum_ack and listens to messages on sum_res. When it receives a message on sum_res, it logs the message and ends the program. nest.js expects the message format to be {data: myData} and then call the param handler sum(myData).

    // Log:
    {"err":null,"response":5} // This is the response from sum()
    {"isDisposed":true} // Internal "complete event" (according to unit test)
    

    Of course, this is not very convenient...


    nest.js Client

    That is because this is meant to be used with another nest.js client rather than a normal mqtt client. The nest.js client abstracts all the internal logic away. See this answer, which describes the client for redis (only two lines need to be changed for mqtt).

    async onModuleInit() {
      await this.client.connect();
      // no 'sum_ack' or {data: [0, 2, 3]} needed
      this.client.send('sum', [0, 2, 3]).toPromise();
    }