I have an array of payload elements: [1,2,3,4]
.
I would like to stream the payload over multiple HTTP calls, 2 HTTP calls simultaneously.
Fire the first 2 HTTP calls, the first one with paylaodElement[0] and the second with paylaodElement[1]. Wait for the result, then make 2 new HTTP calls...
here's a code that fires all payloads on multiple calls simultaneously:
const streams = [].concat.apply(
[],
payload.map((el) => this.http.get(API_ENDPOINT, { el }))
);
Then:
forkJoin(streams).subscribe(result => {
//
})
This fire N HTTP call, where, N = payload.length
.
How can we chunk this by only 2 calls simultaneously?
If I understand the question right I would proceed like this
// first use the from rxjs function to turn an array into a stream of its values
from(payload).pipe(
// then with bufferCount create a stream of tuples
bufferCount(2),
// now we have a stream of tuples - we turn the element of each tuple into
// a tuple of http calls using the map operator
// consider that the inner map is the map function of the javascript array
// while the outer map is the rxjs operator
map(tuple => tuple.map(t => this.http.get(API_ENDPOINT, { el })),
// now we fire the calls in parallel using forkJoin
// we use concatMap to say "wait until forkJoin notifies its result
// before firing the next tuple of calls
concatMap(tupleOfHttpCalls => forkJoin(tupleOfHttpCalls))
)
.subscribe(tupleOfResults => {
// here you do whatever you want with the tuple of results
})
While this code should do what you are looking for, I would consider also a solution where you limit the number of parallel calls to 2 but do not create tuple, rather you have a continuous stream, i.e. when 2 calls are on flight and the first one completes, you immediately start a new one.
Such solution would look like this
// first use the from rxjs function to turn an array into a stream of its values
from(payload).pipe(
// then we use mergeMap with the concurrent value (the second parameter) to limit
// concurrency to 2
mergeMap(value => this.http.get(API_ENDPOINT, { el }), 2),
)
.subscribe(result => {
// here you do whatever you want with each single result
})
This blog can provide you some more ideas about rxjs patterns to be used with asynchronous calls.