Search code examples
javascriptangulartypescriptrxjsrxjs-pipeable-operators

How to prevent multiple http requests from firing all at once


I have an array of objects. For each object I need to trigger an asynchronous request (http call). But I only want to have a certain maximum of requests running at the same time. Also, it would be nice (but not neccessary) if I could have one single synchronization point after all requests finished to execute some code.

I've tried suggestions from:

Limit number of requests at a time with RxJS

How to limit the concurrency of flatMap?

Fire async request in parallel but get result in order using rxjs

and many more... I even tried making my own operators.

Either the answers on those pages are too old to work with my code or I can't figure out how to put everything together so all types fit nicely.

This is what I have so far:

for (const obj of objects) {
  this.myService.updateObject(obj).subscribe(value => {
    this.anotherService.set(obj);
  });
}

EDIT 1: Ok, I think we're getting there! With the answers of Julius and pschild (both seem to work equally) I managed to limit the number of requests. But now it will only fire the first batch of 4 and never fire the rest. So now I have:

const concurrentRequests = 4;
from(objects)
  .pipe(
    mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
    tap(result => this.anotherService.set(result))
  ).subscribe();

Am I doing something wrong with the subscribe()?

Btw: The mergeMap with resultSelector parameter is deprecated, so I used mergeMap without it. Also, the obj of the mergeMap is not visible in the tap, so I had to use tap's parameter

EDIT 2:

Make sure your observers complete! (It cost me a whole day)


Solution

  • You can use the third parameter of mergeMap to limit the number of concurrent inner subscriptions. Use finalize to execute something after all requests finished:

    const concurrentRequests = 5;
    from(objects)
        .pipe(
            mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
            tap(res => this.anotherService.set(res))),
            finalize(() => console.log('Sequence complete'))
        );
    

    See the example on Stackblitz.