Search code examples
javascriptfunctional-programmingrxjsreactivex

RxJS Custom Operator Internal Variables


Are there drawbacks to using/mutating a variable from a custom operator closure in RxJS? I realize it violates the "pure" function principle and that you can use scan for this simple example, but I'm asking specifically for tangible technical issues with underlying pattern below:

const custom = () => {

  let state = 0; 

  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  )
}

// Usage
const obs = interval(1000).pipe(custom())

obs.subscribe()

Solution

  • There are at least two problems with the way you've stored state within your custom operator.

    The first problem is that your doing so means the operator is no longer referentially transparent. That is, if the calling of the operator is replaced with the operator's return value, the behaviour is different:

    const { pipe, range } = rxjs;
    const { map, share, tap } = rxjs.operators;
    
    const custom = () => {
      let state = 0; 
      return pipe(
        map(next => state * next),
        tap(_ => state += 1),
        share()
      );
    };
    
    const op = custom();
    console.log("first use:");
    range(1, 2).pipe(op).subscribe(n => console.log(n));
    console.log("second use:");
    range(1, 2).pipe(op).subscribe(n => console.log(n));
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

    The second problem - as mentioned in the other answer - is that different subscriptions will receive different values in their next notifications, as the state within the operator is shared.

    For example, if the source observable is synchronous, consecutive subscriptions will see different values:

    const { pipe, range } = rxjs;
    const { map, share, tap } = rxjs.operators;
    
    const custom = () => {
      let state = 0; 
      return pipe(
        map(next => state * next),
        tap(_ => state += 1),
        share()
      );
    };
    
    const source = range(1, 2).pipe(custom());
    console.log("first subscription:");
    source.subscribe(n  => console.log(n));
    console.log("second subscription:");
    source.subscribe(n => console.log(n));
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

    However, it is possible to write an operator very similar to your custom operator and have it behave correctly in all circumstances. To do so, it's necessary to ensure that any state within the operator is per-subscription.

    A pipeable operator is just a function that takes an observable and returns an observable, so you could use defer to ensure that your state is per-subscription, like this:

    const { defer, pipe, range } = rxjs;
    const { map, share, tap } = rxjs.operators;
    
    const custom = () => {
      return source => defer(() => {
        let state = 0; 
        return source.pipe(
          map(next => state * next),
          tap(_ => state += 1)
        );
      }).pipe(share());
    };
    
    const op = custom();
    console.log("first use:");
    range(1, 2).pipe(op).subscribe(n => console.log(n));
    console.log("second use:");
    range(1, 2).pipe(op).subscribe(n => console.log(n));
    
    const source = range(1, 2).pipe(op);
    console.log("first subscription:");
    source.subscribe(n => console.log(n));
    console.log("second subscription:");
    source.subscribe(n => console.log(n));
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>