I need to write observable that will complete after there hasn't been new values for x amount of time.
What i have written myself is:
.pipe(takeUntil(timer(1000))).subscribe();
The problem with this is that it will simply complete observable after 1 second. I need to code behaviour where this timer refreshes when there is new value emitted. So lets say: 50 values are emitted -> nothing happens for 1 second -> observable completes.
Any ideas?
Here's how to have a 1s timeout that resets on each emission:
stream$
.pipe(
switchMap((value) => concat(of(value), NEVER.pipe(timeout(1000)))),
catchError((e: Error) =>
e instanceof TimeoutError ? EMPTY : throwError(() => e)
)
)
For completeness I've created a small demo:
const stream$ = concat(
of(1).pipe(delay(500)),
of(2).pipe(delay(10)),
of(3).pipe(delay(950)),
of(4).pipe(delay(1050)), // our final stream should complete at this point because it's taking up more than 1s
of(5)
);
stream$
.pipe(
switchMap((value) => concat(of(value), NEVER.pipe(timeout(1000)))),
catchError((e: Error) =>
e instanceof TimeoutError ? EMPTY : throwError(() => e)
)
)
.subscribe({
next: (x) => console.log('Received', x),
error: (x) => console.log('Error', x),
complete: () => console.log('Complete'),
});
Which outputs:
Received 1
Received 2
Received 3
Complete
As expected.
Here's a live demo.