Search code examples
javascriptnode.jsasync-awaitasync-iterator

Parallel asynchronous iteraor - is it possible?


Right now I have the following code:

import axios from 'axios'

const urls = ['https://google.com', 'https://yahoo.com']

async function* requests() {
  for (const url of urls) {
    yield axios.get(url)
  }
}

;(async () => {
  for await (const n of requests()) {
    console.log(n.config.url) // prints https://google.com and then https://yahoo.com
  }
})()

As is, the requests won't block the single thread of node, but they will happen in sequence. I'm wondering if it would be possible to change the code to force parallelism.


Solution

  • The "simpler" no-deps way would be to batch them and yield every batch with Promise.all

    import axios from 'axios'
    
    const urls = [
      'https://jsonplaceholder.typicode.com/todos/1', 
      'https://jsonplaceholder.typicode.com/posts/1',
      'https://jsonplaceholder.typicode.com/users/1',
      'https://jsonplaceholder.typicode.com/comments/1'
    ]
    
    async function* requests(batchSize = 1) {
      let batchedRequests = [];
      for (const url of urls) {
        batchedRequests.push(axios.get(url));
        if (batchedRequests.length === batchSize) {
          yield Promise.all(batchedRequests);
          batchedRequests = [];
        }
      }
      if (batchedRequests.length) { //if there are requests left in batch
        yield Promise.all(batchedRequests);
      }
    }
    
    ;(async () => {
      for await (const batch of requests(2)) {
        batch.forEach(n => console.log(n.config.url)) // prints https://google.com and then https://yahoo.com
      }
    })()
    

    You can use rxjs to achieve similar results, with the advantages that observables have in terms of flexibility, but it's another library and can be more complex if you're not familiar with reactive streams. Here is a detailed post I found on the topic: https://medium.com/@ravishivt/batch-processing-with-rxjs-6408b0761f39