Search code examples
node.jsasync-awaitweb-worker

Handle async messages in worker threads


We are using NodeJS experimental workers to accomplish some CPU intensive tasks. These tasks are kicked off through messages the parentPort message passing. During the operation of the threads, they need to persist data to a database which is an asynchronous operation backed by promises.

What we are seeing is that the parentPort messages keep being sent to handler function while we doing the asynchronous operations.

An example of the code we are doing:

const { parentPort, Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);

  const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
  for (const x of i) {
    worker.postMessage({ idx: x });
  }
} else {
  parentPort.on('message', async (value) => {
    await testAsync(value);
  });
}

async function testAsync(value) {
  return new Promise((resolve) => {
    console.log(`Starting wait for ${value.idx}`);
    setTimeout(() => {
      console.log(`Complete resolve for ${value.idx}`);
      resolve();

      if(value.idx == 9) {
        setTimeout(() => process.exit(0), 2000);
      }
    }, 500);
  });
}

In the above example we are seeing the Starting wait for ... print before any the the Complete resolve ... messages appear. With the async-await we were expecting the the event handler to wait for the resolved promise before processing the new event. In the real example, the db connection may fail, which throws an exception so we want to ensure that the current message has been fully processed before accepting a new one.

Are we doing anything wrong here?

If not, is there anyway of accomplishing the desired goal of processing the event in order?


Solution

  • It seems that you want to enqueue the messages, and only process one thing at a time.

    parentPort.on('message', () => {} is an event listener, when the event is triggered, it won't wait until the previous asyncrhonous operation inside the callback is done.

    So, if you trigger 'message' a thousand times, testAsync will be executed a thousand times, without waiting.

    You need to implement a queue in the worker, and limit the concurrency. There are multiple promise queue packages in NPM.

    I will use p-queue in this example.

    const PQueue = require('p-queue');
    
    const { parentPort, Worker, isMainThread } = require('worker_threads');
    
    if (isMainThread) {
      const worker = new Worker(__filename);
    
      const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
      for (const x of i) {
        worker.postMessage({ idx: x });
      }
    } else {
    
        const queue = new PQueue({ concurrency: 1 }); // set concurrency
    
        parentPort.on('message', value => {
          queue.add(() => testAsync(value));
        });
    }
    
    async function testAsync(value) {
      return new Promise((resolve) => {
        console.log(`Starting wait for ${value.idx}`);
        setTimeout(() => {
          console.log(`Complete resolve for ${value.idx}`);
          resolve();
    
          if(value.idx == 9) {
            setTimeout(() => process.exit(0), 2000);
          }
        }, 500);
      });
    }
    

    Now the output will be:

    starting wait for 1
    complete resolve for 1
    
    starting wait for 2
    complete resolve for 2
    
    starting wait for N
    complete resolve for N