Search code examples
angularrxjsangular-servicesrxjs-observablesrxjs-pipeable-operators

How to use an Angular service inside an RXJS Custom Operator?


How do I use an Angular service in my custom rxjs operator?

Is it possible to do this?

function myOperator() {
    return function <T>(source: Observable<T>): Observable<T> {
        return new Observable(subscriber => {
            const subscription = source.subscribe({
                next(value) {
                    //access an Angular service HERE
                    subscriber.next(value);
                },
                error(error) {
                    subscriber.error(error);
                },
                complete() {
                    subscriber.complete();
                }
            });
            return () => subscription.unsubscribe();
        });
    };
}

I'd like to use it in an observable pipe:

observable
.pipe(
    myOperator()
)
.subscribe(result => {

});

Solution

  • Creating and registering an injector, bootstrapping it and using it in the custom operator seems to work well, without having to use a service.

    export class RootInjector {
      private static rootInjector: Injector;
      private static readonly $injectorReady = new BehaviorSubject(false);
      readonly injectorReady$ = RootInjector.$injectorReady.asObservable();
    
      static setInjector(injector: Injector) {
        if (this.rootInjector) {
          return;
        }
    
        this.rootInjector = injector;
        this.$injectorReady.next(true);
      }
    
      static get<T>(
        token: Type<T> | InjectionToken<T>,
        notFoundValue?: T,
        flags?: InjectFlags
      ): T {
        try {
          return this.rootInjector.get(token, notFoundValue, flags);
        } catch (e) {
          console.error(
            `Error getting ${token} from RootInjector. This is likely due to RootInjector is undefined. Please check RootInjector.rootInjector value.`
          );
          return null;
        }
      }
    }
    

    This can be registered during the bootstrapping:

    platformBrowserDynamic.bootstrapModule(AppModule).then((ngModuleRef) => {
      RootInjector.setInjector(ngModuleRef.injector);
    });
    

    And then be used in the custom operator:

    function myOperator() {
        const myAngularService = RootInjector.get(MyAngularService);
        return function <T>(source: Observable<T>): Observable<T> {
            return new Observable(subscriber => {
                const subscription = source.subscribe({
                    next(value) {
                        myAngularService.doMyThing();
                        subscriber.next(value);
                    },
                    error(error) {
                        subscriber.error(error);
                    },
                    complete() {
                        subscriber.complete();
                    }
                });
                return () => subscription.unsubscribe();
            });
        };
    }
    

    This causes tests to crash because RootInjector is not set up. But placing this in root-injector.mock.ts file:

    import { TestBed } from '@angular/core/testing';
    import { RootInjector } from 'src/app/core/injectors/root-injector';
    
    RootInjector.setInjector({
        get: (token) => {
            return TestBed.inject(token);
        }
    });
    

    ..and then importing it into the jasmine test file did the trick:

    import 'src/mocks/root-injector.mock';
    
    describe('MyComponent', () => {
        ...
    }
    

    Note that this only works for services providedIn: 'root'

    Thanks to this post!: https://nartc.netlify.app/blogs/root-injector/