Search code examples
rxjsrxjs-pipeable-operators

Encapsulate rxjs pipe into function


I've been searching, but cannot find an answer to this.

Is there a way to encapsulate an rxjs pipe method into a custom method?

<observable>.pipe(
filter((num: number) => num % 2 === 0),
take(10)
map((num: number) => num * 10),).subscribe(...);

I'd like to reduce it to

<observable>.doSomeThingsThatArePiped().subscribe(...);

I know about custom pipeable operators to simplify what's in the pipe, but in my code I have a set of operators that I use multiple times and would like to reduce it down as much as possible - including the pipe.


Solution

  • My original code was made generic, but here's what I've come up with for my real project that extends the Observable prototype.

    import { Observable } from 'rxjs';
    import { takeUntil } from 'rxjs/operators';
    
    declare module 'rxjs' {
      interface Observable<T> {
          subscribeUntilDestroyed: (component: any) => Observable<T>;
      }
    }
    
    interface IDestroyNotifier {
      readonly destroyed$: Observable<boolean>;
    }
    
    function untilDestroyed(notifier: IDestroyNotifier) {
      return <T>(source: Observable<T>) => source.pipe(takeUntil(notifier.destroyed$));
    }
    
    Observable.prototype.subscribeUntilDestroyed = function (component) {
      return this.pipe(untilDestroyed(component));
    };