Search code examples
node.jsrabbitmqnode-amqplib

How to manage publish connection per request on rabbitmq(rascal.js)


I am using Rascal.Js(it uses amqplib) for my messaging logic with rabbitMq on node.js app.

I am using something similar to their example on my project startup, which creates a permanent instance and "registers" all of my subscribers and redirects messages when they arrive to the queue (in the background).

My issue is with the publishers. There are http requests from outside which should trigger my publishers. A user clicks on create button of sorts which leads to certain flow of actions. At some point it reaches the point at which I need to use a publisher.

And here I am not sure about the right approach. Do I need to open a new connection every time I need to publish a message? and close it after it ends? Or maybe I should implement this in a way that it keeps the same connection open for all of the publishers? (I actually not so sure how to create it in a way that it can be accessed from other parts of my app).

At the moment I am using the following :

async publishMessage(publisherName, message) {
        const dynamicSettings = setupDynamicVariablesFromConfigFiles(minimalPublishSettings);
        const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(dynamicSettings.rascal));

        broker.on('error', async function(err) {
            loggerUtil.writeToLog('error', 'publishMessage() broker_error_event: ' + publisherName + err + err.stack);
            await broker.shutdown();
        })
   
        const publication = await broker.publish(publisherName, message);
        try {
            publication.on('error', async function(err) {
                loggerUtil.writeToLog('error', 'publishMessage() publish_error_event: ' + err + err.stack);
                await broker.shutdown();
            }).on("success", async (messageId) => {
                await broker.shutdown();
            }).on("return", async (message) => {
                loggerUtil.writeToLog('error', 'publishMessage() publish_return_event: ' + err + err.stack);
                await broker.shutdown();
            })
        }
        catch(err) {
            loggerUtil.writeToLog('error', 'Something went wrong ' + err + err.stack);
            await broker.shutdown();
        }

    }

I use this function from different parts of my app when I need to publish messages. I thought to just add the broker.shutdown() for all of the endpoints but at some point after an error, I got an exception about closing a connection which was already closed, and this got me worried about the shutdown approach (which probably not a good one). I think it is related to this - I tried doing that (the commented code) but I think it isnt working well in certain situations. If everything is ok it goes to "success" and then I can close it. But one time I had an error instead of success and when I tried to use broker.shutdown() it gave me another exception which crashed the app. I think it is related to this - https://github.com/squaremo/amqp.node/issues/111

I am not sure what might be the safest way to approach this?

Edit:

Actually now that I think about it, the exception might be related to me trying to shutdown the broker in the catch{} area as well. I will continue to investigate.


Solution

  • Rascal is designed to be initiated once at application startup, rather than created per HTTP request. Your application will be extremely slow if you use it in this way, and depending on how many concurrent requests you need to handle, could easily exceed max number of connections you can make to the broker. Furthermore you will get none of the benefits that Rascal provides, such as failed connection recovery.

    If you can pre-determine the queue or exchange you need to publish to, then configure Rascal at application start-up (prior to your http server), and share the publisher between requests. If you are unable to determine the queue or exchange until your receive the http request, then Rascal is not an appropriate choice. Instead you're better off using amqplib directly, but should still establish a shared connection and channel. You will have to handle connection and channel errors manually though, otherwise they will crash your application.