Search code examples
angularrxjsrxjs5reactive-extensions-jsangular2-observables

Combine multiple Observables with different actions/operations


I'm building an Angular2 app, so I'm getting used to Observables and Reactive Extensions as a whole. I'm using TypeScript and rxjs.

Now I've got an observable, or a stream if you will, of an array of some objects. Let's say Person-objects. Now I've got two other streams of Person-objects and want to combine these so I get a stream which is always up to date:

var people$ = getPeople();                  // Observable<Person[]>
var personAdded$ = eventHub.personAdded;    // Observable<Person>;
var personRemoved$ = eventHub.personRemoved // Observable<Person>;

var allwaysUpToDatePeople$ = people$.doSomeMagic(personAdded$, personRemoved$, ...);

If the people-stream emits an array of, let's say, 5 people, and after that the personAdded-stream emits a person, the allPeople-stream wil emit an array of 6. If the personRemoved-stream emits a person, the allPeople-stream should emit an array of Person-objects without the one just emitted by the personRemoved-stream.

Is there a way built into rxjs to get this behaviour?


Solution

  • My suggestion is that you wrap the idea of an action into a stream which can then be merged and applied directly to the Array.

    The first step is to define some functions that describe your actions:

    function add(people, person) {
      return people.concat([people]);
    }
    
    function remove(people, person) {
      const index = people.indexOf(person);
      return index < 0 ? people : people.splice(index, 1);
    }
    

    Note: we avoid mutating the Array in place because it can have unforeseen side effects. Purity demands that we create a copy of the array instead.

    Now we can use these functions and lift them into the stream to create an Observable that emits functions:

    const added$ = eventHub.personAdded.map(person => people => add(people, person));
    const removed$ = eventHub.personRemoved.map(person => people => remove(people, person));
    

    Now we get events in the form of: people => people where the input and output will be an array of people (in this example simplified to just an array of strings).

    Now how would we wire this up? Well we really only care about adding or removing these events after we have an array to apply them to:

    const currentPeople = 
    
      // Resets this stream if a new set of people comes in
      people$.switchMap(peopleArray => 
    
        // Merge the actions together 
        Rx.Observable.merge(added$, removed$)
    
          // Pass in the starting Array and apply each action as it comes in
          .scan((current, op) => op(current), peopleArray)
    
          // Always emit the starting array first
          .startWith(people)
      )
      // This just makes sure that every new subscription doesn't restart the stream
      // and every subscriber always gets the latest value
      .shareReplay(1);
    

    There are several optimizations of this technique depending on your needs (i.e. avoiding the function currying, or using a binary search), but I find the above relatively elegant for the generic case.