Search code examples
javascriptnode.jspromisepriority-queue

Promise callback queue with priority for some callbacks


My code needs to perform a multitude of async actions simultaneously and handle their promises in a specific sequential way.

What I mean by "specific sequential way" ==> Assume you are starting promises promise1, promise2, promise3 in that order, and that promise3 actually resolves first, and promise2 second, then what I want is to process promise3, promise2 and promise1 sequentially in that order

Let's consider a timeout async function that times out after X seconds, and fetchMyItem1, fetchMyItem2, that return promises which, when fulfilled, should execute a different code depending whether timeout has resolved or not.

For a concrete scenario, imagine there is a customer waiting at the counter for an item to be delivered, and either the customer stays and we can serve him directly at the counter by bringing one item at a time, or the customer goes away (timeout) and we have to ask a waiter to come so that when the items arrive, he/she can bring the items to him. Note here that even when one item is being delivered, the other items should still be undergoing delivery (the promises are pending), and might even arrive (be fulfilled) while the customer is being served one item or the server arrives.

Here is some code to start with

const allItemsToBeDeliveredPromises = [fetchMyItem1(), fetchMyItem2(), ...]
const customerLeavesCounterPromise = timeout()

const waiter = undefined

const allPromisesToBeFulfilled = [...allItemsToBeDeliveredPromises, customerLeavesCounterPromise]

// LOOP
const itemDeliveredOrVisitorLeft = await Promise.all(allPromisesToBeFulfilled)

if hasCustomerLeft(itemDeliveredOrCustomerLeft) {
  // hasCustomerLeft allows us to detect if the promise that resolved first is `customerLeavesCounterPromise` or not
  waiter = await callWaiter()
} else {
  // An item has arrived
  if (waiter) {
    deliverItemViaWaiter(itemDeliveredOrVisitorLeft)
  } else {
    deliverItemAtCounter(itemDeliveredOrVisitorLeft)
  }
}
// remove itemDeliveredOrCustomerLeft from allPromisesToBeFulfilled

// END loop

I am not sure how to implement a loop for this scenario. Promises must be accumulated into a queue as they resolve, but there is a priority for a specific promise in the queue (the timeout promise should be executed asap when it arrives, but after finishing the processing of the current promise if a promise fulfilment is already being processed)


Solution

  • My code needs to perform a multitude of async actions simultaneously and handle their promises in a specific sequential way.

    You could use streams to consume the promises, as streams are essentially queues that process one message at a time.

    The idea (i.e. not tested):

    import { Readable, Writable } from 'stream';
    let customerHasLeft = false;
    /*const customerLeavesCounterPromise = */timeout() // your code...
    .then(() => { customerHasLeft = true; }); // ... made boolean
    // let's push all our promises in a readable stream
    // (they are supposedly sorted in the array)
    const input = new Readable({
      objectMode: true,
      read: function () { // don't use arrow function: we need `this`
        const allItemsToBeDeliveredPromises = [fetchMyItem1(), fetchMyItem2(), ...]; // your code
        // put everything, in the same order, in the output queue
        allItemsToBeDeliveredPromises.forEach(p => this.push(p));
        this.push(null); // terminate the stream after all these
      }
    });
    // let's declare the logic to process each promise
    // (depending on `timeout()` being done)
    const consumer = new Writable({
      write: async function (promise, uselessEncoding, callback) {
        try {
          const order = await promise; // wait for the current promise to be completed
        } catch (e) {
          /* delivery error, do something cool like a $5 coupon */
          return callback(e); // or return callback() without e if you don't want to crash the pipe
        }
        if (customerHasLeft) { /* find a waiter (you can still `await`) and deliver `order` */ }
        else { /* deliver `order` at the counter */ }
        callback(); // tell the underlying queue we can process the next promise now
      }
    });
    // launch the whole pipe
    input.pipe(consumer);
    // you can add listeners on all events you'd like:
    // 'error', 'close', 'data', whatever...
    

    EDIT: actually we want to process promises as they resolve, but sequentially (i.e. a single post-process for all promises)

    let customerHasLeft = false;
    timeout() // your code...
    .then(() => { customerHasLeft = true; }); // ... made boolean
    const allItemsToBeDeliveredPromises = [fetchMyItem1(), fetchMyItem2(), ...];
    const postProcessChain = Promise.resolve(); // start with a promise ready to be thened
    // add a next step to each promise so that as soon as one resolves, it registers
    //  as a next step to the post-process chain
    allItemsToBeDeliveredPromises.forEach(p => p.then(order => postProcessChain.then(async () => {
      // do something potentially async with the resulting order, like this:
      if (customerHasLeft) { /* find a waiter (you can still `await`) and deliver `order` */ }
      else { /* deliver `order` at the counter */ }
    })));