Search code examples
javascriptrxjsobservable

RxJS waiting for array elements to complete not working as intended


I'm expecting the following RxJS behavior: for every element in the source array, execute an action (the part commented out) which needs to be awaited to complete, and only then fetch next element in source array, wait again and so on. But the behavior I get instead, is all elements in the source array are fetched at the same time, then after the delay they are retried all again etc.

import { from, defer, delay, repeat, tap } from 'rxjs';

const source$ = from([1, 2, 3])

const actions$ = source$.pipe(
  tap((t) => console.log(t))
  // ... action that takes long and needs to be waited for, before going to the next element in source$
)

const timedExecution$ = defer(() => actions$).pipe(
  delay(3000),
  repeat(3)
)

timedExecution$.subscribe();


I also tried another way, with timer:

import { from, tap, timer } from 'rxjs';

const source$ = from([1, 2, 3])

const actions$ = source$.pipe(
  () => timer(0, 3000),
  tap((t) => console.log(t))
  // actionThatTakesLong() action that takes long and needs to be waited for, before going to the next element in source$
)

actions$.subscribe();

Here, it emits one at a time, but sometimes the actionThatTakesLong() takes longer than the arbitrary 3000 MS value of the timer, and i need it to wait until its done, instead of a hardcoded value of waiting. Thanks for any hints in advance


Solution

  • Your source Observable is from() which is a synchronous Observable that emits array items one after another immediately on subscription. It doesn't (and can't) care what happens with the values in the chain.

    delay() will take each value and delay it by a certain time but it doesn't (and can't) care whether the previous values have reached your observer. It just takes each value and delays it by 3s without waiting for the previous delay to complete so in your case it appears like all values were emitted at the same time.

    What you want to do instead is adding concatMap() operator that will wait until the nested delayed Observable completes:

    from([1, 2, 3])
      .pipe(
        concatMap(value => of(value).pipe(delay(3000))),
      )
      .subscribe(...);
    

    FYI, the second option you are mentioning does something very different than you think:

    const actions$ = source$.pipe(
      () => timer(0, 3000),
      tap(() => ...),
    );
    

    This is actually replacing the source Observable from() with a different Observable timer(0, 3000). You're basically using approach used for creating custom operators https://rxjs.dev/guide/operators#creating-new-operators-from-scratch.