My Node app uses Mongo change streams, and the app runs 3+ instances in production (more eventually, so this will become more of an issue as it grows). So, when a change comes in the change stream functionality runs as many times as there are processes.
How to set things up so that the change stream only runs once?
Here's what I've got:
const options = { fullDocument: "updateLookup" };
const filter = [
{
$match: {
$and: [
{ "updateDescription.updatedFields.sites": { $exists: true } },
{ operationType: "update" }
]
}
}
];
const sitesStream = Client.watch(sitesFilter, options);
// Start listening to site stream
sitesStream.on("change", async change => {
console.log("in site change stream", change);
console.log(
"in site change stream, update desc",
change.updateDescription
);
// Do work...
console.log("site change stream done.");
return;
});
While the Kafka option sounded interesting, it was a lot of infrastructure work on a platform I'm not familiar with, so I decided to go with something a little closer to home for me, sending an MQTT message to a little stand alone app, and letting the MQTT server monitor messages for uniqueness.
siteStream.on("change", async change => {
console.log("in site change stream);
const mqttClient = mqtt.connect("mqtt://localhost:1883");
const id = JSON.stringify(change._id._data);
// You'll want to push more than just the change stream id obviously...
mqttClient.on("connect", function() {
mqttClient.publish("myTopic", id);
mqttClient.end();
});
});
I'm still working out the final version of the MQTT server, but the method to evaluate uniqueness of messages will probably store an array of change stream IDs in application memory, as there is no need to persist them, and evaluate whether to proceed any further based on whether that change stream ID has been seen before.
var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
context = message.toString().replace(/"/g, "");
if (seen.indexOf(context) < 0) {
seen.push(context);
// Do stuff
}
});
This doesn't include security, etc., but you get the idea.