Search code examples

Merge implemented as flatMap

Theoretically it should be possible to implement any RxJS operator (except just() and flatMap()) through flatMap(). For instance map() can be implemented as

function map(source, selector) {
  return source.flatMap(x => Rx.Observable.just(selector(x)));

How to implement merge() through flatMap()? (avoiding mergeAll() too, of course)


  • It looks possible if you take advantage of the fact that flatMap can also take array return values.

    Rx.Observable.prototype.merge = function(other) {
      var source = this;
      return Rx.Observable.just([source, other])
               //Flattens the array into observable of observables
               .flatMap(function(arr) { return arr; })
               //Flatten out the observables
               .flatMap(function(x) { return x; });

    EDIT 1

    Using RxJS 6 and the pipe syntax

    import {of} from 'rxjs'
    import {flatMap} from 'rxjs/operators'
    function merge (other) {
      return source => of([source, other]).pipe(
               //Flattens the array into observable of observables
               flatMap(arr => arr)
               //Flatten out the observables
               flatMap(x => x)

    const {timestamp, map, flatMap, take} = rxjs.operators;
    const {interval, of: just} = rxjs;
    const source1 = interval(2000).pipe(
      map(x => "Interval 1 at " + x.timestamp + " w/ " + x.value)
    const source2 = interval(3000).pipe(
      map(x => "Interval 2 at " + x.timestamp + " w/ " + x.value)
    function mergeFromFlatMap (other) {
      return source => just([source, other]).pipe(
        flatMap(arr => arr),
        flatMap(seq => seq)
    <script src=""></script>