Search code examples
angularrxjs

Angular - Stream multiple HTTP calls 2 per 2


I have an array of payload elements: [1,2,3,4]. I would like to stream the payload over multiple HTTP calls, 2 HTTP calls simultaneously.

Fire the first 2 HTTP calls, the first one with paylaodElement[0] and the second with paylaodElement[1]. Wait for the result, then make 2 new HTTP calls...

here's a code that fires all payloads on multiple calls simultaneously:

const streams = [].concat.apply(
  [],
  payload.map((el) => this.http.get(API_ENDPOINT, { el }))
);

Then:

forkJoin(streams).subscribe(result => {
 // 
})

This fire N HTTP call, where, N = payload.length. How can we chunk this by only 2 calls simultaneously?


Solution

  • If I understand the question right I would proceed like this

    // first use the from rxjs function to turn an array into a stream of its values
    from(payload).pipe(
       // then with bufferCount create a stream of tuples
       bufferCount(2),
       // now we have a stream of tuples - we turn the element of each tuple into 
       // a tuple of http calls using the map operator
       // consider that the inner map is the map function of the javascript array 
       // while the outer map is the rxjs operator
       map(tuple => tuple.map(t => this.http.get(API_ENDPOINT, { el })),
       // now we fire the calls in parallel using forkJoin
       // we use concatMap to say "wait until forkJoin notifies its result
       // before firing the next tuple of calls
       concatMap(tupleOfHttpCalls => forkJoin(tupleOfHttpCalls))
    )
    .subscribe(tupleOfResults => {
       // here you do whatever you want with the tuple of results
    })
    

    While this code should do what you are looking for, I would consider also a solution where you limit the number of parallel calls to 2 but do not create tuple, rather you have a continuous stream, i.e. when 2 calls are on flight and the first one completes, you immediately start a new one.

    Such solution would look like this

    // first use the from rxjs function to turn an array into a stream of its values
    from(payload).pipe(
       // then we use mergeMap with the concurrent value (the second parameter) to limit
       // concurrency to 2
       mergeMap(value => this.http.get(API_ENDPOINT, { el }), 2),
    )
    .subscribe(result => {
       // here you do whatever you want with each single result
    })
    

    This blog can provide you some more ideas about rxjs patterns to be used with asynchronous calls.