Search code examples
javascriptrxjs

RxJS: Observable never completes when using mergeMap with concurrent


I'm using RxJS to call an asynchronous function for each element of the array. I want to call the asynchronous function only if the element satisfies certain conditions, and I also want to limit the number of concurrent executions.

So I used the mergeMap operator with the concurrent parameter, but I have experienced strange behavior.

The minimum reproducible example is:

import { from, timer, mergeMap, EMPTY } from "rxjs";

from(Array(1000).keys()).pipe(
  mergeMap(n => n < 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () => console.log('Complete') });

Suppose that Array(1000).keys() is the array, n < 100 is the condition, and timer(10) is the asynchronous function.

I expected the observable to complete, but it didn't.

Strangely, the observable completes if the concurrent parameter is not specified, or the asynchronous function is called for all of the elements without the conditions.

// without concurrent parameter
mergeMap(n => n < 100 ? timer(10) : EMPTY)
// or without condition
mergeMap(n => timer(10), 10)
// or
mergeMap(n => EMPTY, 10)

I also observed the next values by the following code. (Note that I used timer(10).pipe(map(() => n)) to get the input values)

import { from, timer, mergeMap, EMPTY, map } from "rxjs";

from(Array(1000).keys()).pipe(
  mergeMap(n => n < 100 ? timer(10).pipe(map(() => n)) : EMPTY, 10),
).subscribe({ next: console.log, complete: () => console.log('Complete') });

The output values stopped at 90.

...
86
87
88
89
90

I want to know why this behavior occurs.


Solution

  • How mergeMap works when concurrent is not specified

    When not specified, the concurrent's value is Infinity.

    When a new value is received by mergeMap, it will create a new Observable(a.k.a inner Observable) based on the project function(i.e. the function provided to mergeMap) and it will subscribe to it.

    Since the concurrent option has the value of Infinity, there can be as many incoming values as possible, mergeMap will create an inner Observable for each of them without any issues.

    How mergeMap works when concurrent is specified

    When this option is specified, mergeMap will employ a buffer which will hold the exceeding values:

    const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));
    

    In the code above, we see that the buffer is populated with new values whenever the number of active inner Observables exceeds the number of concurrent allowed. The outerNext function is called when mergeMap receives values, such as [0, 999] from Array(1000).

    Why the Observable does not complete

    This code

    import { from, timer, mergeMap, EMPTY } from "rxjs";
    
    from(Array(1000).keys()).pipe(
      mergeMap(n => n < 100 ? timer(10) : EMPTY, 10)
    ).subscribe({ complete: () => console.log('Complete') });
    

    will not complete due to an implementation detail that is used in the doInnerSub() function(the function called from outerNext()).

    The gist is that it is a recursive function. This is a pseudo-code implementation of what it does(you can find the real implementation here):

    const doInnerSub = value => {
      // Increase number of active connections.
      active++;
    
      // Creating from the projection function provided to `mergeMap`.
      createInnerObservable({
        // During the teardown phase(also when `finalize` is called).
        afterComplete: () => {
          if (hasInnerObservableCompleted) {
            // Making space for a new connection.
            active--;
    
            while (buffer.length && active < concurrent) {
              bufferedValue = buffer.shift()!;
              doInnerSub(bufferedValue);
            }
          }
        }
      });
    }
    

    After all the Observables created with timer(10) complete, only EMPTY Observables will be created. Such Observables completed immediately as they're subscribed. In other words, they complete synchronously. The result of this is that, after the 90th timer(10) Observable(90 = 100 - 10), these lines:

    while (buffer.length && active < concurrent) {
      bufferedValue = buffer.shift()!;
      doInnerSub(bufferedValue);
    }
    

    will cause this error to be thrown:

    enter image description here

    Here's also a screenshot with the callstack that seems to prove that doInnerSub() is being called synchronously(and recursively) too many times.

    enter image description here

    Why the Observable completes when concurrent is omitted

    The answer is because doInnerSub() does not have to call itself anymore(because the buffer is no longer used). Basically, there will be 1000 inner Observables created.

    Why the Observable completes when no condition is used

    For this case:

    mergeMap(n => timer(10), 10)
    

    the callstack will not keep on increasing because timer(10) involves an asynchronous action(i.e. setInterval()).

    For this case:

    mergeMap(n => EMPTY, 10)
    

    Since EMPTY completes immediately as it is being subscribed, the buffer won't be populated at all. So, the callstack exceeded error can't occur.

    enter image description here

    In the above image, that breakpoint won't be hit at all.

    Why the values stop at 90

    As to why it stops as 90, the reason is because concurrent = 10, so after the {100(from mergeMap condition) - concurrentValue(10)} timer(10) Observable, the callstack will keep on increasing because the first EMPTY Observable is taken from the buffer then from there the infinite recursion starts.

    If you were to use, for instance, 11, the last value printed would be 89:

    mergeMap((n) => (n < 100 ? timer(10) : EMPTY), 11),
    

    enter image description here