I'm subscribing to a queue of events and every time I get an event, I have to make an async HTTP request and publish the response, in the same order I received the event to another queue. So basically subscribe to a pub/sub application, do some async computation and publish the computed result no another pub/sub application. Since I don't have a set number of callbacks to execute, I can't use async.series.
What I thought of doing was to create a queue that would let me insert a message and an id, and would emit an event every time the inserted id was equal to the last id emitted + 1. I would then subscribe to this queue and publish to my pub/sub application every time I got an event out of my queue since that would guarantee a sequential order.
What I need to do seems to be a very common task but I haven't been able to find a module for that. Is there something on NPM that already does that or is there a better way to accomplish what I need?
I ended up creating my own module to do what I needed, which is to be able to subscribe to a pub/sub application and publish to another application, doing some async work in between but still keeping the order in which the messages are received by the subscriber.
With this module I'm able to do orderedPubSub.publish(id, message)
in the callback of the async work I need to perform when receiving an event from the first application and otherApplication.publish(message)
on the on("message")
of my OrderedPubSub module.
I wish there was another way of doing this or a module already in NPM though.
const EventEmitter = require('events');
class OrderedPubSub extends EventEmitter {
constructor(initialId = 0) {
super()
this.lastPublishedId = initialId
this.messages = {}
}
publish(id, message) {
this.messages[id] = message
this.publishAllAvailable()
}
publishAllAvailable() {
let messageId;
while((messageId = this.lastPublishedId + 1) in this.messages) {
const message = this.messages[messageId]
delete this.messages[messageId]
this.lastPublishedId++
this.emit("message", message)
}
}
}
const orderedPubSub = new OrderedPubSub();
orderedPubSub.on('message', message => {
console.log(`Received message: "${message}"`)
});
orderedPubSub.publish(3, "third message")
orderedPubSub.publish(2, "second message")
orderedPubSub.publish(4, "fourth message")
orderedPubSub.publish(10, "tenth message")
orderedPubSub.publish(1, "first message")
//outputs
// Received message: "first message"
// Received message: "second message"
// Received message: "third message"
// Received message: "fourth message"