Search code examples
javascriptasync-awaitpromisefetchgenerator

async generator yielding promise results as they are resolved


Say I want to fetch 10 urls concurrently, and process the responses as they are recieved (which may be in a different order from the order in which they appear in the original list). Ignoring the possibility of rejections, one way to do this is simply to attach a "then" callback to each promise, and then wait for them all to finish using Promise.all().

const fetch_promises = [
  fetch("https://cors-demo.glitch.me/allow-cors"),
  fetch("/"),
  fetch("."),
  fetch(""),
  fetch("https://enable-cors.org"),
  fetch("https://html5rocks-cors.s3-website-us-east-1.amazonaws.com/index.html"),
  fetch("https://api.github.com"),
  fetch("https://api.flickr.com/services/rest/"),
];
const processing_promises = [];
for (const fetch_promise of fetch_promises) {
  processing_promises.push(fetch_promise.then(response => {
    // Process response.  In this example, that means just
    // print it.
    console.log("got a response: ",response);
  }));
}
await Promise.all(processing_promises);

Switching to an example with clearer and more deterministic output:

const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
    sleep(3000).then(()=>"slept 3000"),
    sleep(1000).then(()=>"slept 1000"),
    sleep(5000).then(()=>"slept 5000"),
    sleep(4000).then(()=>"slept 4000"),
    sleep(2000).then(()=>"slept 2000"),
];
const processing_promises = [];
for (const sleep_promise of sleep_promises) {
  processing_promises.push(sleep_promise.then(result => {
     console.log("promise resolved: ",result);
  }));
}
await Promise.all(processing_promises);

The output is as expected:

15:54:16.331 promise resolved:  slept 1000
15:54:17.331 promise resolved:  slept 2000
15:54:18.331 promise resolved:  slept 3000
15:54:19.332 promise resolved:  slept 4000
15:54:20.331 promise resolved:  slept 5000

My question is this: suppose I want to, or need to, express the processing described above as an "async for..of" loop, instead of "then" callbacks; so the promises results need to come out in the form of an async iterable. How would I convert the array of promises to such an async iterable? What I'm asking for is an async generator function AwaitAsTheyCome(), taking as input a list of promises, which yields the results one by one as the promises resolve. I'd then call the function, and do the processing, as follows:

for await (const result of AwaitAsTheyCome(sleep_promises)) {
 console.log("promise resolved: ",result);
}

It should give the same output (with the same timing) as above.

The following attempted solution obviously doesn't work, but it may give an idea of about how simple and short I expect this to be:

async function* AwaitAsTheyCome(promises) {
  for (const promise of promises) {
    promise.then(response => {
      yield response;  // WRONG
      // I want to yield it from AwaitAsTheyCome,
      // not from the current arrow function!
    });
  }
}

The following solution does work, but it's more code than I expected to have to write for this.

async function* AwaitAsTheyCome(promises) {
  // Make a list of notifier promises and
  // functions that resolve those promises,
  // one for each of the original promises.
  const notifier_promises = [];
  const notifier_resolves = [];
  for (const promise of promises) {
    notifier_promises.push(
        new Promise(resolve=>notifier_resolves.push(resolve)));
  }

  const responses = [];
  for (const promise of promises) {
    promise.then(response => {
      responses.push(response);
      // send one notification (i.e. resolve the next notifier promise)
      notifier_resolves.shift()();
    });
  }

  for (const promise of promises) {
    // wait for one notification
    // (i.e. wait for the next notifier promise to be resolved).
    await notifier_promises.shift();
    // yield the corresponding response
    yield responses.shift();
  }
}

// Example/test usage
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
  sleep(3000).then(()=>"slept 3000"),
  sleep(1000).then(()=>"slept 1000"),
  sleep(5000).then(()=>"slept 5000"),
  sleep(4000).then(()=>"slept 4000"),
  sleep(2000).then(()=>"slept 2000"),
];
for await (const result of AwaitAsTheyCome(sleep_promises)) {
 console.log("promise resolved: ",result);
}

Is there a simpler way to implement the async generator function AwaitAsTheyCome?

(I tried making a stacksnippet out of the above code, but it didn't work-- I suspect this is because the snippets system doesn't understand the new async generator and/or for await..of syntax)


Solution

  • You can simplify the code a bit by

    • using only a single loop over the input array (although that may be confusing)
    • not using a responses array but simply fulfilling the promises
    • not using .shift() on the promises array but simply looping it
    async function* raceAll(input) {
      const promises = [];
      const resolvers = [];
      for (const p of input) {
        promises.push(new Promise(resolve=> {
          resolvers.push(resolve);
        }));
        p.then(result => {
          resolvers.shift()(result);
        });
      }
    
      for (const promise of promises) {
        yield promise;
      }
    }
    

    If you don't like the amount of code required, I would recommend to factor out the queue this implements in a separate module. With e.g. this implementation, the code can become as simple as

    function raceAll(promises) {
      const queue = new AsyncBlockingQueue();
      for (const p of promises) {
        p.then(result => {
          queue.enqueue(result); 
        });
      }
      return queue[Symbol.asyncIterator]();
    }
    

    However, both of these implementation miss a crucial issue: error handling. If any of these promises rejects, you'll get an unhandled rejection error which may crash your process. To actually get the async iterator to reject the next promise, so that a try/catch around a for await…of loop may handle it, you'd need to do something like

    async function* raceAll(input) {
      const promises = [];
      const resolvers = [];
      for (const p of input) {
        promises.push(new Promise(resolve => {
          resolvers.push(resolve);
        }));
        p.finally(() => {
          resolvers.shift()(p);
        });
        // works equivalent to:
        // p.then(result => {
        //   resolvers.shift()(result);
        // }, error => {
        //   resolvers.shift()(Promise.reject(error));
        // });
      }
    
      for (const promise of promises) {
        yield promise;
      }
    }
    

    Resolving the promise with a rejected promise does the trick so that we still only need one queue of resolver functions, not one containing both resolve and reject functions.