Search code examples
node.jsazure-functionsazure-stream-analytics

How to break up incoming http request in Node.js to process the json object individually


I am working with a data processing scenario where incoming data is received in the Azure IoT Hub, processed using Azure stream analytics and posted (http trigger) to an Azure Function:https://github.com/Azure/iotc-device-bridge written in Node.js. The goal here to break the incoming data and process objects asynchoronously. The incoming Data is an array of json objects.

I have tried to loop through req.body to break it up to different objects but that hasn't worked.

Thanks for all the help!

Incoming message to the function:

     [{
     "device": {
     "deviceId": "my-cloud-device"
     },
     "measurements": {
     "temp": 20.31,
     "pressure": 50,
     "humidity": 8.5,
     "ledColor": "blue"
     }
     },
     {
     "device": {
     "deviceId": "my-cloud-device"
     },
     "measurements": {
     "temp": 20.31,
     "pressure": 50,
     "humidity": 8.5,
     "ledColor": "blue"
     }
     }]

Code that processes the incoming data

const request = require('request-promise-native');
const handleMessage = require('./lib/engine');
const bodyParser = require('body-parser');

const msiEndpoint = process.env.MSI_ENDPOINT;
const msiSecret = process.env.MSI_SECRET;

const parameters = {
idScope: process.env.ID_SCOPE,
primaryKeyUrl: process.env.IOTC_KEY_URL
};

let kvToken;

module.exports = async function (context, req) {
try {

    await handleMessage({ ...parameters, log: context.log, getSecret:    getKeyVaultSecret }, req.body.device, req.body.measurements, req.body.timestamp);

} catch (e) {
    context.log('[ERROR]', e.message);

    context.res = {
        status: e.statusCode ? e.statusCode : 500,
        body: e.message
    };
}
console.log(b);

}

What I am expecting to do is to break apart the incoming message into different object that are processed async using the code block above.

 {
 "device": {
 "deviceId": "my-cloud-device"
 },
 "measurements": {
 "temp": 20.31,
 "pressure": 50,
 "humidity": 8.5,
 "ledColor": "blue"
 }
 }

 {
 "device": {
 "deviceId": "my-cloud-device"
 },
 "measurements": {
 "temp": 20.31,
 "pressure": 50,
 "humidity": 8.5,
 "ledColor": "blue"
 }
 }

Solution

  • I assume handleMessage is just for one object. In such a case, you would have to loop over the req.body to access each one. Could you share what you tried and what didn't work before?

    Your code should be something like this

    ///
    
    module.exports = async function (context, req) {
    let commonArgs = { ...parameters, log: context.log, getSecret:    getKeyVaultSecret };
    let promiseList = [];
    
    try {
      for (let dev of req.body) {
          promiseList.add(handleMessage(commonArgs, dev.device, dev.measurements, dev.timestamp));
      }
    
      await Promise.all(promiseList); // This way is much better but you could also await each call too
    
    } catch (e) {
    
    ///
    

    I don't see a timestamp property in your data. Maybe you removed it?

    Also, for much better scale, you might have to check out Durable Functions too, specifically the Fan Out / Fan In pattern.

    With this, you would move handleMessage into an activity function which would scale with your load.