Search code examples
javascriptrxjsobservable

Complete rxjs observable afer not emitting values for x seconds


I need to write observable that will complete after there hasn't been new values for x amount of time.

What i have written myself is:

.pipe(takeUntil(timer(1000))).subscribe();

The problem with this is that it will simply complete observable after 1 second. I need to code behaviour where this timer refreshes when there is new value emitted. So lets say: 50 values are emitted -> nothing happens for 1 second -> observable completes.

Any ideas?


Solution

  • Here's how to have a 1s timeout that resets on each emission:

    stream$
      .pipe(
        switchMap((value) => concat(of(value), NEVER.pipe(timeout(1000)))),
        catchError((e: Error) =>
          e instanceof TimeoutError ? EMPTY : throwError(() => e)
        )
      )
    

    For completeness I've created a small demo:

    const stream$ = concat(
      of(1).pipe(delay(500)),
      of(2).pipe(delay(10)),
      of(3).pipe(delay(950)),
      of(4).pipe(delay(1050)), // our final stream should complete at this point because it's taking up more than 1s
      of(5)
    );
    
    stream$
      .pipe(
        switchMap((value) => concat(of(value), NEVER.pipe(timeout(1000)))),
        catchError((e: Error) =>
          e instanceof TimeoutError ? EMPTY : throwError(() => e)
        )
      )
      .subscribe({
        next: (x) => console.log('Received', x),
        error: (x) => console.log('Error', x),
        complete: () => console.log('Complete'),
      });
    

    Which outputs:

    Received 1
    Received 2
    Received 3
    Complete
    

    As expected.

    Here's a live demo.