I have a function called playBellWebAudio
which returns an Observable for the completed ringing of a bell sound. I want to be able to trigger bell rings by calling .next()
on a Subject, and I want the rings to queue up, so I'd use concatMap
.
But then how should I a) cancel the current bell singly and b) cancel all pending bell rings?
I think I can get single cancelation to work like this, but I wish it could read cleaner:
const ringer = new Subject<void>();
const cancels = new Subject<void>();
ringer
.pipe(
concatMap(() => {
return defer(playBellWebAudio).pipe(
// Allow single-cancelation
takeUntil(cancels)
);
})
)
.subscribe();
ringer.next() // Add ring to the queue
cancels.next() // Cancel the current ring playing
But, I can't get queue-cancelation to work in raw RxJS like I can with @rxfx/effect
.
The RxFx code, for comparison is:
import { createEffect } from '@rxfx/effect'
const bellRinger = createQueueingEffect(playBellWebAudio);
bellRinger.cancelCurrent(); // cancels one
bellRinger.cancelCurrentAndQueued(); // also empties the queue
Can anybody show idiomatic RxJS for both single and multiple cancelation?
BTW This is similar to the unanswered Canceling the inner Observable in concatMap.
I think your current code is pretty good and that you just need to handle the entire queue cancelling, which can be done in several ways, here are some examples:
const ringer = new Subject<void>();
const cancels = new Subject<void>();
const restartEntireQueue = new Subject<void>();
restartEntireQueue
.pipe(
switchMap(() =>
ringer.pipe(
concatMap(() =>
defer(playBellWebAudio).pipe(
// Allow single-cancelation
takeUntil(cancels)
)
)
)
)
)
.subscribe();
ringer.next(); // Add ring to the queue
cancels.next(); // Cancel the current ring playing
or also
ringer
.pipe(
concatMap(() =>
defer(playBellWebAudio).pipe(
// Allow single-cancelation
takeUntil(cancels)
)
),
takeUntil(restartEntireQueue),
repeat()
)
.subscribe();