Search code examples
parallel-processingrxjsqueueconcurrent-queue

RxJS parallel queue with concurrent workers?


Let's say I want to I download 10,000 files. I can easily build a queue of those 10,000 files (happy to take advice if any of this can be done better),

import request from 'request-promise-native';
import {from} from 'rxjs';

let reqs = [];
for ( let i = 0; i < 10000; i++ ) {
  reqs.push(
    from(request(`http://bleh.com/${i}`))
  )
};

Now I have an array of Rx.JS observable I've created from promises that represent my queue. Now for the behavior of what I want, I want to issue

  • Three-concurrent requests to the server
  • Upon completion of a request, I would like a new request to fire.

I can create a solution to this problem, but in light of things like the Rxjs queue, which I've never used I'm wondering what the right-most Rxjs way to do this is.


Solution

  • It sounds like you want an equivalent of forkJoin that supports a caller-specified maximum number of concurrent subscriptions.

    It's possible to re-implement forkJoin using mergeMap and to expose the concurrent parameter, like this:

    import { from, Observable } from "rxjs";
    import { last, map, mergeMap, toArray } from "rxjs/operators";
    
    export function forkJoinConcurrent<T>(
      observables: Observable<T>[],
      concurrent: number
    ): Observable<T[]> {
      // Convert the array of observables to a higher-order observable:
      return from(observables).pipe(
        // Merge each of the observables in the higher-order observable
        // into a single stream:
        mergeMap((observable, observableIndex) => observable.pipe(
          // Like forkJoin, we're interested only in the last value:
          last(),
          // Combine the value with the index so that the stream of merged
          // values - which could be in any order - can be sorted to match
          // the order of the source observables:
          map(value => ({ index: observableIndex, value }))
        ), concurrent),
        // Convert the stream of last values to an array:
        toArray(),
        // Sort the array of value/index pairs by index - so the value
        // indices correspond to the source observable indices and then
        // map the pair to the value:
        map(pairs => pairs.sort((l, r) => l.index - r.index).map(pair => pair.value))
      );
    }