Search code examples
rxjsobservable

Cancel all items in a queue with RxJS


I have a function called playBellWebAudio which returns an Observable for the completed ringing of a bell sound. I want to be able to trigger bell rings by calling .next() on a Subject, and I want the rings to queue up, so I'd use concatMap.

But then how should I a) cancel the current bell singly and b) cancel all pending bell rings?

I think I can get single cancelation to work like this, but I wish it could read cleaner:

const ringer = new Subject<void>();
const cancels = new Subject<void>();
ringer
  .pipe(
    concatMap(() => {
      return defer(playBellWebAudio).pipe(
        // Allow single-cancelation
        takeUntil(cancels)
      );
    })
  )
  .subscribe();

ringer.next() // Add ring to the queue
cancels.next() // Cancel the current ring playing

But, I can't get queue-cancelation to work in raw RxJS like I can with @rxfx/effect.

The RxFx code, for comparison is:

import { createEffect } from '@rxfx/effect'
const bellRinger = createQueueingEffect(playBellWebAudio);

bellRinger.cancelCurrent(); // cancels one
bellRinger.cancelCurrentAndQueued(); // also empties the queue

Can anybody show idiomatic RxJS for both single and multiple cancelation?

BTW This is similar to the unanswered Canceling the inner Observable in concatMap.


Solution

  • I think your current code is pretty good and that you just need to handle the entire queue cancelling, which can be done in several ways, here are some examples:

    const ringer = new Subject<void>();
    const cancels = new Subject<void>();
    const restartEntireQueue = new Subject<void>();
    
    restartEntireQueue
      .pipe(
        switchMap(() =>
          ringer.pipe(
            concatMap(() =>
              defer(playBellWebAudio).pipe(
                // Allow single-cancelation
                takeUntil(cancels)
              )
            )
          )
        )
      )
      .subscribe();
    
    ringer.next(); // Add ring to the queue
    cancels.next(); // Cancel the current ring playing
    

    or also

    ringer
      .pipe(
        concatMap(() =>
          defer(playBellWebAudio).pipe(
            // Allow single-cancelation
            takeUntil(cancels)
          )
        ),
        takeUntil(restartEntireQueue),
        repeat()
      )
      .subscribe();