Search code examples
typescriptrxjsoperatorsdelay

Outputting after a delay on subscribe using rxjs operators


I need to get an output like the following:

computing 1
5
6
7
8

wait 2 seconds

computing 2
5
6
7
8

wait 2 seconds
...

but with the following code

from([1,2,3,4]).pipe(
    concatMap(n => of(n).pipe(
        tap(n => {console.log(`computing ${n}`)}),
        concatMap(n => from([5,6,7,8])),
        delay(2000)
    ))
).subscribe((val) => {console.log(val)}, () => {}, () => {console.log(`end`)})

the output will be

computing 1

wait 2 seconds

5
6
7
8
computing 2

wait 2 seconds

5
6
7
8
computing 3

because the delay will take effect after the innermost flattening and cause next computing x string to be printed right after the value emissions. Instead, I need to get the above example output without getting an initial delay, is it possible?


Solution

  • First we set up a function that creates an observable that stays open for a length of time and then completes.

    const nothingFor = (ms) => timer(ms).pipe(concatMapTo(EMPTY));
    

    Then we use it to make a new operator that behaves like delay*, but applies the delay after.

    const appendDelay = (delay) => (source$) =>
      of(source$, nothingFor(delay)).pipe(concatAll());
    

    Then we just drop it in where you were originally using delay.

    from([1, 2, 3, 4])
      .pipe(
        concatMap((n) =>
          of(n).pipe(
            tap((n) => {
              console.log(`computing ${n}`);
            }),
            concatMap((n) => from([5, 6, 7, 8]).pipe(appendDelay(2000)))
          )
        )
      )
    

    * well, sort of. delay delays each emission by the same amount. If this were more like delay, it would add a delay after each emission instead of after the source completes.