I need to write observable that will complete after there hasn't been new values for x amount of time.
What i have written myself is:
The problem with this is that it will simply complete observable after 1 second. I need to code behaviour where this timer refreshes when there is new value emitted. So lets say: 50 values are emitted -> nothing happens for 1 second -> observable completes.
Any ideas?
Here's how to have a 1s timeout that resets on each emission:
switchMap((value) => concat(of(value), NEVER.pipe(timeout(1000)))),
catchError((e: Error) =>
e instanceof TimeoutError ? EMPTY : throwError(() => e)
For completeness I've created a small demo:
const stream$ = concat(
of(4).pipe(delay(1050)), // our final stream should complete at this point because it's taking up more than 1s
switchMap((value) => concat(of(value), NEVER.pipe(timeout(1000)))),
catchError((e: Error) =>
e instanceof TimeoutError ? EMPTY : throwError(() => e)
next: (x) => console.log('Received', x),
error: (x) => console.log('Error', x),
complete: () => console.log('Complete'),
Which outputs:
Received 1
Received 2
Received 3
As expected.
Here's a live demo.