I am working with a data processing scenario where incoming data is received in the Azure IoT Hub, processed using Azure stream analytics and posted (http trigger) to an Azure Function:https://github.com/Azure/iotc-device-bridge written in Node.js. The goal here to break the incoming data and process objects asynchoronously. The incoming Data is an array of json objects.
I have tried to loop through req.body to break it up to different objects but that hasn't worked.
Thanks for all the help!
Incoming message to the function:
[{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
},
{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
}]
Code that processes the incoming data
const request = require('request-promise-native');
const handleMessage = require('./lib/engine');
const bodyParser = require('body-parser');
const msiEndpoint = process.env.MSI_ENDPOINT;
const msiSecret = process.env.MSI_SECRET;
const parameters = {
idScope: process.env.ID_SCOPE,
primaryKeyUrl: process.env.IOTC_KEY_URL
};
let kvToken;
module.exports = async function (context, req) {
try {
await handleMessage({ ...parameters, log: context.log, getSecret: getKeyVaultSecret }, req.body.device, req.body.measurements, req.body.timestamp);
} catch (e) {
context.log('[ERROR]', e.message);
context.res = {
status: e.statusCode ? e.statusCode : 500,
body: e.message
};
}
console.log(b);
}
What I am expecting to do is to break apart the incoming message into different object that are processed async using the code block above.
{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
}
{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
}
I assume handleMessage
is just for one object. In such a case, you would have to loop over the req.body
to access each one. Could you share what you tried and what didn't work before?
Your code should be something like this
///
module.exports = async function (context, req) {
let commonArgs = { ...parameters, log: context.log, getSecret: getKeyVaultSecret };
let promiseList = [];
try {
for (let dev of req.body) {
promiseList.add(handleMessage(commonArgs, dev.device, dev.measurements, dev.timestamp));
}
await Promise.all(promiseList); // This way is much better but you could also await each call too
} catch (e) {
///
I don't see a timestamp
property in your data. Maybe you removed it?
Also, for much better scale, you might have to check out Durable Functions too, specifically the Fan Out / Fan In pattern.
With this, you would move handleMessage
into an activity function which would scale with your load.