Search code examples
javascriptrxjsngrxredux-observableswitchmap

switchMap distincted by property in RxJs


Let's say, I have a stream of actions. Each action is assigned some id. Like this:

const actions$ = of({ id: 1 }, { id: 2 }, { id: 1 });

Now, for each action, I want to perform some logic in switchMap:

actions$.pipe(switchMap(a => /* some cancellable logic */)).subscribe(...);

The problem is that each emitted action cancels previous 'some cancellable logic'.

Is it possible to cancel 'some cancellable logic' based on action id, preferably an operator? Something like:

actions$.pipe(switchMapBy('id', a => /*some cancellable logic */)).subscribe(...)

Essentially, current behaviour with switchMap:
1. actions$ emits id #1. switchMap subscribes to nested observable.
2. actions$ emits id #2. switchMap unsubscribes from previous nested observable. Subscribes to new one.
3. actions$ emits id #1. switchMap again unsubscribes from previous nested observable. Subscribes to new one.

Expected behaviour:
1. actions$ emits id #1. switchMap subscribes to nested observable.
2. actions$ emits id #2. switchMap again subscribes to nested observable (this time with #2). And here's the difference, it doesn't cancel the one from #1.
3. actions$ emits id #1. switchMap unsubscribes from nested observable for #1. Subscribes again, for #1.


Solution

  • this seems to be a use case for the mergeMap operator. The use case of switchMap is to only maintain one inner subscription and cancel previous ones, which is not what you're after here. You want multiple inner subscriptions and you want them to cancel when a new value of the same id comes through, so implement some custom logic to do that.

    something along the lines of:

    action$.pipe(
      mergeMap(val => {
        return (/* your transform logic here */)
                  .pipe(takeUntil(action$.pipe(filter(a => a.id === val.id)))); // cancel it when the same id comes back through, put this operator at the correct point in the chain
      })
    )
    

    you can turn this into something resuable by writing a custom operator:

    import { OperatorFunction, Observable, from } from 'rxjs';
    import { takeUntil, filter, mergeMap } from 'rxjs/operators';
    
    export function switchMapBy<T, R>(
      key: keyof T,
      mapFn: (val: T) => Observable<R> | Promise<R>
    ): OperatorFunction<T, R> {
      return input$ => input$.pipe(
        mergeMap(val => 
          from(mapFn(val)).pipe(
            takeUntil(input$.pipe(filter(i => i[key] === val[key])))
          )
        )
      );
    }
    

    and use it like:

    action$.pipe(
      switchMapBy('id', (val) => /* your transform logic here */)
    );
    

    here's a blitz of it in action: https://stackblitz.com/edit/rxjs-x1g4vc?file=index.ts