Search code examples
node-red

Node-RED: Combine / Join messages with the same ID into one message


Example Flow

We have this example flow in Node-RED. Incoming data is simulated via Trigger and "DoSomethin". We hava a username and a company name. For both we want to fecht the ID from a database. Here simulated by the Function "FetchUser" and "FetchCompany". Both returning an incrementing counter. Because in real world there may be multiple requests/triggers per second at a rate so fast that the database might not be fast enough to fetch the data before the next requests arrive. To simulate this I have a 2 seconds delay. Now when clicking the Trigger fast 5 times in a row, only one debug output is generated with user "5" and company "1", so 4 requests are lost. Since each trigger generates a unique message id it should be possible to join all corresponding messages and none should be lost. But how can that be achived?

Exported Node-RED code for importing

[{"id":"f265141.b1fe4e8","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"7cae48f.189e438","type":"inject","z":"f265141.b1fe4e8","name":"","topic":"","payload":"Trigger","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":340,"wires":[["8a2175a4.921"]]},{"id":"8a2175a4.921","type":"function","z":"f265141.b1fe4e8","name":"DataInputSimulation","func":"user=\"user@example.com\";\ncompany=\"ExampleCorp\";\nmsg.payload = {};\nmsg.payload.user = user;\nmsg.payload.company=company;\nreturn msg;","outputs":1,"noerr":0,"x":420,"y":340,"wires":[["474672db.36bd14","6c146c08.d0e884"]]},{"id":"474672db.36bd14","type":"change","z":"f265141.b1fe4e8","name":"FetchUser","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload.user","tot":"msg"},{"t":"set","p":"topic","pt":"msg","to":"user","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":610,"y":280,"wires":[["d7c10a91.8cf49"]]},{"id":"6c146c08.d0e884","type":"change","z":"f265141.b1fe4e8","name":"FetchCompany","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload.company","tot":"msg"},{"t":"set","p":"topic","pt":"msg","to":"company","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":600,"y":380,"wires":[["6cd41950.b1f0b"]]},{"id":"d7c10a91.8cf49","type":"function","z":"f265141.b1fe4e8","name":"FetchUser","func":"if (typeof context.zaehler === 'undefined')\n{\n    context.zaehler=0;\n}\ncontext.zaehler++;\nmsg.payload = context.zaehler;\nmsg.parts = {};\nmsg.parts.id = msg._msgid;\nmsg.parts.index = 0;\nmsg.parts.count = 2;\nreturn msg;","outputs":1,"noerr":0,"x":830,"y":280,"wires":[["50504a67.e8a5c4","409aeeb4.586b28"]]},{"id":"6cd41950.b1f0b","type":"function","z":"f265141.b1fe4e8","name":"FetchCompany","func":"if (typeof context.zaehler === 'undefined')\n{\n    context.zaehler=0;\n}\ncontext.zaehler++;\nmsg.payload = context.zaehler;\nmsg.parts = {};\nmsg.parts.id = msg._msgid;\nmsg.parts.index = 1;\nmsg.parts.count = 2;\nreturn msg;","outputs":1,"noerr":0,"x":840,"y":380,"wires":[["3f37bb6c.7c7484","955cad64.587d28"]]},{"id":"50504a67.e8a5c4","type":"debug","z":"f265141.b1fe4e8","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":1090,"y":280,"wires":[]},{"id":"3f37bb6c.7c7484","type":"debug","z":"f265141.b1fe4e8","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":1090,"y":380,"wires":[]},{"id":"409aeeb4.586b28","type":"join","z":"f265141.b1fe4e8","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"2","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"num","reduceFixup":"","x":640,"y":580,"wires":[["c93fce1a.d3aa6"]]},{"id":"c93fce1a.d3aa6","type":"debug","z":"f265141.b1fe4e8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":900,"y":600,"wires":[]},{"id":"955cad64.587d28","type":"delay","z":"f265141.b1fe4e8","name":"","pauseType":"delay","timeout":"2","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":840,"y":480,"wires":[["409aeeb4.586b28"]]}]

If manally set msg.parts.id and stuff like that, but it didn't help.

Here as example the code of node FetchUser

if (typeof context.zaehler === 'undefined')
{
    context.zaehler=0;
}
context.zaehler++;
msg.payload = context.zaehler;
msg.parts = {};
msg.parts.id = msg._msgid;
msg.parts.index = 0;
msg.parts.count = 2;
return msg;

Solution

  • If you set the join node to automatic this just works. You can change the output array back into a keyed message with matching keys afterwards because you know which path has which part number.

    [{"id":"7ded445b.56a8b4","type":"inject","z":"7a67e2ef.b4884c","name":"","topic":"","payload":"Trigger","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":110,"y":140,"wires":[["895ad6c4.4153c"]]},{"id":"895ad6c4.4153c","type":"function","z":"7a67e2ef.b4884c","name":"DataInputSimulation","func":"user=\"user@example.com\";\ncompany=\"ExampleCorp\";\nmsg.payload = {};\nmsg.payload.user = user;\nmsg.payload.company=company;\nreturn msg;","outputs":1,"noerr":0,"x":300,"y":140,"wires":[["d7c67e43.9e52c","ec3b0a63.574b88"]]},{"id":"d7c67e43.9e52c","type":"change","z":"7a67e2ef.b4884c","name":"FetchUser","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload.user","tot":"msg"},{"t":"set","p":"topic","pt":"msg","to":"user","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":510,"y":120,"wires":[["31a50616.60f60a"]]},{"id":"ec3b0a63.574b88","type":"change","z":"7a67e2ef.b4884c","name":"FetchCompany","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload.company","tot":"msg"},{"t":"set","p":"topic","pt":"msg","to":"company","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":520,"y":160,"wires":[["96a0ae8e.936d88"]]},{"id":"31a50616.60f60a","type":"function","z":"7a67e2ef.b4884c","name":"FetchUser","func":"if (typeof context.zaehler === 'undefined')\n{\n    context.zaehler=0;\n}\ncontext.zaehler++;\nmsg.payload = context.zaehler;\nmsg.parts = {};\nmsg.parts.id = msg._msgid;\nmsg.parts.index = 0;\nmsg.parts.count = 2;\nreturn msg;","outputs":1,"noerr":0,"x":710,"y":120,"wires":[["8d65dfdc.0c0038","777bde.99691c24"]]},{"id":"96a0ae8e.936d88","type":"function","z":"7a67e2ef.b4884c","name":"FetchCompany","func":"if (typeof context.zaehler === 'undefined')\n{\n    context.zaehler=0;\n}\ncontext.zaehler++;\nmsg.payload = context.zaehler;\nmsg.parts = {};\nmsg.parts.id = msg._msgid;\nmsg.parts.index = 1;\nmsg.parts.count = 2;\nreturn msg;","outputs":1,"noerr":0,"x":720,"y":160,"wires":[["7a299b8d.a4fd1c","4f5fd876.a42238"]]},{"id":"8d65dfdc.0c0038","type":"debug","z":"7a67e2ef.b4884c","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":910,"y":60,"wires":[]},{"id":"7a299b8d.a4fd1c","type":"debug","z":"7a67e2ef.b4884c","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":930,"y":140,"wires":[]},{"id":"777bde.99691c24","type":"join","z":"7a67e2ef.b4884c","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"2","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"num","reduceFixup":"","x":870,"y":260,"wires":[["bdaf6b65.8dbfc","852b8706.4d67"]]},{"id":"bdaf6b65.8dbfc","type":"debug","z":"7a67e2ef.b4884c","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":950,"y":360,"wires":[]},{"id":"4f5fd876.a42238","type":"delay","z":"7a67e2ef.b4884c","name":"","pauseType":"delay","timeout":"2","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":660,"y":300,"wires":[["777bde.99691c24"]]},{"id":"852b8706.4d67","type":"change","z":"7a67e2ef.b4884c","name":"","rules":[{"t":"move","p":"payload","pt":"msg","to":"temp","tot":"msg"},{"t":"move","p":"temp[0]","pt":"msg","to":"payload.user","tot":"msg"},{"t":"move","p":"temp[0]","pt":"msg","to":"payload.company","tot":"msg"},{"t":"delete","p":"temp","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":700,"y":420,"wires":[["8f67784d.5de82"]]},{"id":"8f67784d.5de82","type":"debug","z":"7a67e2ef.b4884c","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":920,"y":440,"wires":[]}]
    

    The trick is that the move from the array is always accessing element 0 because it pops the elements out of the array.