Search code examples
node.jsmultithreadingdesign-patternsproducer-consumertask-queue

Sequentially execute webhooks received in node application


I have a node application using koa. It receiving webhooks from external application on specific resources.

To illustrate let say the webhook send me with POST request an object of this type :

{
  'resource_id':'<SomeID>',
  'resource_origin':'<SomeResourceOrigin>',
  'value' : '<SomeValue>'
}

I would like to execute sequentially any resources coming from the same origin to avoid desynchronization of resources related to my execution.

I was thinking to use database as lock and use cron to sequentially executing my process for each resources of same origin.

But I'm not sure it's the most efficient method.

So my question is here :

Do you know some method/package/service allowing me to use global queues that I could implement for each origin insuring resources from same origin will be executed synchronously without making all webhooks processed sequentially ? If it do not use database it's better.


Solution

  • Solution I decide to use

    Small brief of the post discussion

    As Ivan Rubinson let me know my problem is just a producer-consumer problem.

    So I finally chose to use RabbitMQ because I have a huge amount of webhook to process. For peoples having a small amount of request to process and do not want use external tools O. Jones answer is a real good way to solve the problem.

    Solution design

    I finally install and configure a RabbitMQ server, then I created for each origin of my web-hooks one queue.

    Producer

    On the producer side when I receive the web-hook data I send a message to the queue corresponding to the origin of my web-hook with serialized information needed to process in fact id of the row in the Database to make messages as light as possible.

    Consumer

    On the consumer side I create a consumer function for each origin queue and set the fetch policy to one to process message one by one in each queue finally I set the channel policy to wait an acknowledgement message before to send the next message . Wit this configuration consumers proceed message by message and solve the initial problem.

    Implementation

    Producer

       async function create(){
            await   amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
                await conn.createChannel().then(async (ch)=>{
                    global.channel_publisher=ch;
                });
            });
        }
    
        async function sendtask(queue,task){
            if(!global.channel_publisher){
                await create();
            }
            global.channel_publisher.assertQueue(queue).then((ok)=>{
                global.channel_publisher.sendToQueue(queue, Buffer.from(task));
            });
        }
    

    I use the sendtask(queue,task) function at the place I received my web-hook

    Consumer

       async function create(){
          await amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
             await conn.createChannel().then(async (ch)=>{
                ch.prefetch(1);
                global.channel_consumer=ch;
              });
           });
        }
    
       async function consumeTask(queue){
           if(!global.channel_consumer){
               await create();
           }
    
           global.channel_consumer.assertQueue(queue).then((ok)=>{
              global.channel_consumer.consume(queue,(message)=>{
                   const args=message.content.toString().split(';');
    
                        await processWebhooks(args);
                        global.channel_consumer.ack(message);
               });
           });
       }
    

    I use the consumeTask(queue) when I had to process a new origin of web-hooks. Also I use it for initialize my application with all known origins in the database.