I'm using RxJS to call an asynchronous function for each element of the array. I want to call the asynchronous function only if the element satisfies certain conditions, and I also want to limit the number of concurrent executions.
So I used the mergeMap
operator with the concurrent
parameter, but I have experienced strange behavior.
The minimum reproducible example is:
import { from, timer, mergeMap, EMPTY } from "rxjs";
from(Array(1000).keys()).pipe(
mergeMap(n => n < 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () => console.log('Complete') });
Suppose that Array(1000).keys()
is the array, n < 100
is the condition, and timer(10)
is the asynchronous function.
I expected the observable to complete, but it didn't.
Strangely, the observable completes if the concurrent
parameter is not specified, or the asynchronous function is called for all of the elements without the conditions.
// without concurrent parameter
mergeMap(n => n < 100 ? timer(10) : EMPTY)
// or without condition
mergeMap(n => timer(10), 10)
// or
mergeMap(n => EMPTY, 10)
I also observed the next
values by the following code. (Note that I used timer(10).pipe(map(() => n))
to get the input values)
import { from, timer, mergeMap, EMPTY, map } from "rxjs";
from(Array(1000).keys()).pipe(
mergeMap(n => n < 100 ? timer(10).pipe(map(() => n)) : EMPTY, 10),
).subscribe({ next: console.log, complete: () => console.log('Complete') });
The output values stopped at 90.
...
86
87
88
89
90
I want to know why this behavior occurs.
mergeMap
works when concurrent
is not specifiedWhen not specified, the concurrent
's value is Infinity
.
When a new value is received by mergeMap
, it will create a new Observable(a.k.a inner Observable) based on the project function(i.e. the function provided to mergeMap
) and it will subscribe to it.
Since the concurrent
option has the value of Infinity
, there can be as many incoming values as possible, mergeMap
will create an inner Observable for each of them without any issues.
mergeMap
works when concurrent
is specifiedWhen this option is specified, mergeMap
will employ a buffer
which will hold the exceeding values:
const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));
In the code above, we see that the buffer is populated with new values whenever the number of active inner Observables exceeds the number of concurrent allowed. The outerNext
function is called when mergeMap
receives values, such as [0, 999] from Array(1000)
.
This code
import { from, timer, mergeMap, EMPTY } from "rxjs";
from(Array(1000).keys()).pipe(
mergeMap(n => n < 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () => console.log('Complete') });
will not complete due to an implementation detail that is used in the doInnerSub()
function(the function called from outerNext()
).
The gist is that it is a recursive function. This is a pseudo-code implementation of what it does(you can find the real implementation here):
const doInnerSub = value => {
// Increase number of active connections.
active++;
// Creating from the projection function provided to `mergeMap`.
createInnerObservable({
// During the teardown phase(also when `finalize` is called).
afterComplete: () => {
if (hasInnerObservableCompleted) {
// Making space for a new connection.
active--;
while (buffer.length && active < concurrent) {
bufferedValue = buffer.shift()!;
doInnerSub(bufferedValue);
}
}
}
});
}
After all the Observables created with timer(10)
complete, only EMPTY
Observables will be created. Such Observables completed immediately as they're subscribed. In other words, they complete synchronously. The result of this is that, after the 90th timer(10)
Observable(90 = 100 - 10), these lines:
while (buffer.length && active < concurrent) {
bufferedValue = buffer.shift()!;
doInnerSub(bufferedValue);
}
will cause this error to be thrown:
Here's also a screenshot with the callstack that seems to prove that doInnerSub()
is being called synchronously(and recursively) too many times.
concurrent
is omittedThe answer is because doInnerSub()
does not have to call itself anymore(because the buffer is no longer used). Basically, there will be 1000 inner Observables created.
For this case:
mergeMap(n => timer(10), 10)
the callstack will not keep on increasing because timer(10)
involves an asynchronous action(i.e. setInterval()
).
For this case:
mergeMap(n => EMPTY, 10)
Since EMPTY
completes immediately as it is being subscribed, the buffer won't be populated at all. So, the callstack exceeded error can't occur.
In the above image, that breakpoint won't be hit at all.
As to why it stops as 90, the reason is because concurrent = 10
, so after the {100(from mergeMap condition) - concurrentValue(10)} timer(10)
Observable, the callstack will keep on increasing because the first EMPTY
Observable is taken from the buffer then from there the infinite recursion starts.
If you were to use, for instance, 11, the last value printed would be 89:
mergeMap((n) => (n < 100 ? timer(10) : EMPTY), 11),