Search code examples
angularrxjsobservablebatch-processing

Rxjs: resolve an array of observables using batches


I have an array of observables. Lets say:

const responses$: Observable<Response>[] = [
    this.service.get(1), 
    this.service.get(2), 
    this.service.get(3), 
    this.service.get(4)
];

I want to solve them in batches of size, let's say, 2, meaning that I will just have 2 batches in the end. Also, this batch management has to be sequential: I do not want to send the requests 3 and 4 (second batch) until the requests 1 and 2 (first batch) have been resolved.

Also, I want to retrieve the result of all the batches together, meaning that even if I have N batches, I just want my subscribe to receive one single result of all of them.

I have tried everything and I do not find an Rxjs-exclusive way of doing it. My real problem is that I am dealing with +2000 concurrent requests to an API that returns files that does not allow for that many requests at the same time, so what I want to do is to use sequential batches so that the API doesn't get overwhelmed.


Solution

  • You can first group the observables into batches:

    const responses: Observable<string>[] = [
      // ...
    ];
    
    const batchSize = 2;
    const groupedResponses: Observable<string>[][] = [];
    while(responses.length) {
      groupedResponses.push(responses.splice(0, batchSize));
    }
    

    And then merge the batches and concat them like this:

    concat(
      ...groupedResponses.map((group) => merge(...group))
    ).pipe(
      toArray(),
      tap(console.log)
    ).subscribe();