Search code examples
node.jsnode-redazure-iot-hub

Event emitter loop not working in node-red function


I am attempting to use a node-red function to read Azure IoT Hub messages using AMQP. I have imported the azure-iothub module.

The code below connects ok and getFeedbackReceiver returns an AmqpReceiver object (I can see the object output to debug tab) which acts as an event emitter. However, the emitter loop (msgReceiver.on) doesn't seem to run. I don't even get a null output to debug tab.

Any help appreciated.

var azureamqp = global.get('azureamqp');

var iothub = azureamqp.Client.fromConnectionString(cnct);

try 
   {
         iothub.open(function()
         {
            node.send({type: "connection", payload: util.inspect(iothub)});
            node.status({ fill: "green", shape: "ring", text: "listening" }); 

            iothub.getFeedbackReceiver(function(err,msgReceiver)
                {
                    if (!err) 
                    {
                        node.send({type: "receiver", payload: util.inspect(msgReceiver)});
                        msgReceiver.on('message',function(message)
                            {
                                node.send({payload:message}); 
                            });                        
                    }
                    else
                    {
                        node.send({payload: err});
                    }
                });

            node.send({type: "streamend"}); 
            node.status({ fill: "red", shape: "ring", text: "disconnected" }); 
         });
   }
catch (err)
   {}

Solution

  • Ok I think I see what's happening thanks to the additional comments: First of, some reference docs about messaging with IoT Hub: https://learn.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messaging

    There are 2 types of messages that are sent through IoT Hub, depending on the direction in which they go:

    • Cloud to Device (C2D, or commands): these messages are sent by a cloud app to one or more devices. They are stored in a device-specific queue, and delivered to the device as soon as the device connects and starts listening for messages. Once received by the device, the device can choose to send feedback about these to IoT Hub. This feedback is received using the azure-iothub.Client.getFeedbackReceiver() API. I've added more about this at the end of the answer.

    • Device-to-Cloud (D2C, or telemetry): these messages are sent by the devices to the cloud application. These messages are stored in Event Hubs partitions and can be read from these partitions. This happens with the Event Hubs SDK

    From the question and the comments it looks like you're trying to receive D2C messages (telemetry) using the feedback API - and it's not working, because it's not meant to work that way. It's good feedback for (bad?) API design and lack of docs though.

    How to receive messages sent by devices:

    This sample on github shows how to set up an Event Hubs client and can be used to listen to messages sent by devices to IoT Hub.This command in iothub-explorer is also an easy reference.

    Please find below a simple example:

    'use strict';
    var EventHubClient = require('azure-event-hubs').Client;
    var Promise = require('bluebird');
    
    var connectionString = '[IoT Hub Connection String]';
    var client = EventHubClient.fromConnectionString(connectionString);
    var receiveAfterTime = Date.now() - 5000;
    
    var printError = function (err) {
      console.error(err.message);
    };
    
    var printEvent = function (ehEvent) {
      console.log('Event Received: ');
      console.log(JSON.stringify(ehEvent.body));
      console.log('');
    };
    
    client.open()
          .then(client.getPartitionIds.bind(client))
          .then(function (partitionIds) {
            return Promise.map(partitionIds, function (partitionId) {
              return client.createReceiver('$Default', partitionId, { 'startAfterTime' : receiveAfterTime}).then(function(receiver) {
                receiver.on('errorReceived', printError);
                receiver.on('message', printEvent);
              });
            });
          }).catch(printError);
    

    A little more about that code sample:

    Basically, the Event Hubs client provide an AMQP connection can be used to open receivers on each partition of the Event Hub. Partitions are used to store messages sent by devices. Each partition gets its own receiver, and each receiver has a message event. Hence the need to open one receiver per partition to never miss any message from any devices. Here's a little bit more about Event Hubs and the nature of partitions: https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-what-is-event-hubs

    More about C2D Feedback:

    There are 3 types of feedback that a device can send about a C2D message:

    • accept (or complete) means the message is taken care of by the device and the IoT Hub will remove this message from the device queue
    • reject means that the device doesn't want this message (maybe because it's malformed, or irrelevant, that's up to you to device) and the IoT Hub will remove the message from the queue.
    • abandon means that the device cannot "take care" of this message right now and wants IoT Hub to resend it later. The message remains in the queue.

    accept and reject are both going to be observed using the getFeedbackReceiver API, as well as timeouts if the message is never received or is abandonned too many times.