Search code examples
angulartypescriptrxjsngrx

Accessing stream variable in a custom RxJS operator in parameters


I'm trying to find a way how to access the stream variable in a custom operator. My effect looks like this:

@Effect()
v$ = this.actions$.pipe(
  ofType(ActionTypes.Action),
  map((action: ActionType) => action.payload),
  customOperator(
    new ActionCall(<I need to use the stream variable here>),
    ... // some other things not important in this context
  ),
);

And the customOperator looks like this:

export function customOperator<T>(<function parameters>): OperatorFunction<T, T> {
  return function (source$: Observable<T>): Observable<T> {
    ... // the stream variable is available here but it's no use to me here
  };
}

I've tried assigning the stream variable into a class variable inside the tap operator but it gets undefined that way. This is really driving me crazy.


Solution

  • What you're asking for really isn't possible. customOperator creates a new stream, but it can't know the values in the stream until after creation time. That's what starts getting passed in after you subscribe.

    You can, however, get customOperator to hand you the value as it receives it. Just like map, filter, ect - you'll need customOperator to take a function as input (it can still take whatever other parameters as well). That function can take the stream value as it's argument and return whatever your customOperator needs. That might look like this:

    @Effect()
    v$ = this.actions$.pipe(
      ofType(ActionTypes.Action),
      map((action: ActionType) => action.payload),
      customOperator(
        val => new ActionCall(val),
        ... // some other things not important in this context
      ),
    );
    
    export function customOperator<T>(func, <other function parameters>): MonoTypeOperatorFunction<T> {
      return function (source$: Observable<T>): Observable<T> {
        streamVariable = ... // the stream variable is available here b̶u̶t̶ ̶i̶t̶'̶s̶ ̶n̶o̶ ̶u̶s̶e̶ ̶t̶o̶ ̶m̶e̶
        const actionCall = func(streamVariable);
        // Use actionCall however you need.
      };
    }
    

    Now you've used your stream variable to create a new ActionCall. That's about as close as you'll get with RxJS.