Search code examples
rxjsmethod-chainingpipeliningtransducer

Whatever happened to `Observable.transduce` in RxJS v5+?


RxJS v4 used to have an Observable.transduce method which took a transducer. This allowed the use of library-independent transducer operators which had major performance benefits in the past.

Sources

RxJS v5.5 and v6 have pipeable operators and v6 removed method chaining. Because of this, I assumed RxJS operators were standard transducers. Looking through the source code, that doesn't seem to be the case.

RxJS v6 operators function like a transducer where each value is passed entirely through the chain before the next value goes through, but RxJS v6 operators aren't using the standard transducer methods I've seen in other libraries meaning, I don't think they're portable.

The whole thing about transducers is they don't know anything about the collection itself. Instead of writing 100 operators specifically for observables, you could write 100 operators universally able to be applied to any collection or stream type.

Is .pipe unanimous with .transduce or was this method completely removed in RxJS v5?


Solution

  • I had the exact same question and could not find the answer anywhere. Yes you can pipe, but I believe that would create intermediary observables for each operator. I don't know for sure though, it would be about reading the code.

    So I came up with my own transduce operator :

    function transformForObserver(o) {
      return {
        "@@transducer/init": function() {
          return o;
        },
        "@@transducer/step": function(obs, input) {
          return obs.next(input);
        },
        "@@transducer/result": function(obs) {
          return obs.complete();
        }
      };
    }
    
        const transduce = (obs, transducer) => {
          const xform = transducer(transformForObserver);
    
          return Observable.create(o => {
            return obs.subscribe({
              next: x => {
                const res = tryCatch(
                  xform["@@transducer/step"],
                  err => {
                    console.error(`Error occurred in transducer/step!`, err);
                    return err;
                  }
                )(xform, o, x);
    
                if (res instanceof Error) { o.error(res); }
              },
              error: err => {
                console.error(`Error occurred in observable passed to Rx transduce fn!`, err);
                o.error(err);
              },
              complete: () => {o.complete();}
            });
          });
        }
    

    Haven't tested it yet, will post soon about it if there is interest.

    Update : I forked jslongser's tranducers library and included such transducers in it. Fork is https://github.com/brucou/transducers.js, and the function is transduceLazyObservable. Cf. tests for example of use.