Search code examples
javascripteventsrxjsreactive-programmingreactivex

Merge events from a changing list of Observables


I'm using rxjs.

I have a Browser that's responsible for a number of Page objects. Each page has an Observable<Event> that yields a stream of events.

Page objects are closed and opened at various times. I want to create one observable, called TheOneObservable that will merge all the events from all the currently active Page objects, and also merge in custom events from the Browser object itself.

Closing a Page means that the subscription to it should be closed so it doesn't prevent it from being GC'd.

My problem is that Pages can be closed at any time, which means that the number of Observables being merged is always changing. I've thought of using an Observable of Pages and using mergeMap, but there are problems with this. For example, a subscriber will only receive events of Pages that are opened after it subscribes.


Note that this question has been answered here for .NET, but using an ObservableCollection that isn't available in rxjs.


Here is some code to illustrate the problem:

class Page {
    private _events = new Subject<Event>();

    get events(): Observable<Event> {
        return this._events.asObservable();
    }
}

class Browser {
    pages = [] as Page[];
    private _ownEvents = new Subject<Event>();

    addPage(page : Page) {
        this.pages.push(page);
    }

    removePage(page : Page) {
        let ixPage = this.pages.indexOf(page);
        if (ixPage < 0) return;
        this.pages.splice(ixPage, 1);
    }

    get oneObservable() {
        //this won't work for aforementioned reasons
        return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
    }
}

It's in TypeScript, but it should be understandable.


Solution

  • You can switchMap() on a Subject() linked to array changes, replacing oneObservable with a fresh one when the array changes.

    pagesChanged = new Rx.Subject();
    
    addPage(page : Page) {
      this.pages.push(page);
      this.pagesChanged.next();
    }
    
    removePage(page : Page) {
      let ixPage = this.pages.indexOf(page);
      if (ixPage < 0) return;
      this.pages.splice(ixPage, 1);
      this.pagesChanged.next();
    }
    
    get oneObservable() {
      return pagesChanged
        .switchMap(changeEvent =>
          Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents)
        )
    }
    

    Testing,

    const page1 = { events: Rx.Observable.of('page1Event') }
    const page2 = { events: Rx.Observable.of('page2Event') }
    
    let pages = []; 
    const pagesChanged = new Rx.Subject();
    const addPage = (page) => { 
      pages.push(page); 
      pagesChanged.next(); 
    }
    const removePage = (page) => { 
      let ixPage = pages.indexOf(page);
      if (ixPage < 0) return;
      pages.splice(ixPage, 1);
      pagesChanged.next(); 
    }
    
    const _ownEvents = Rx.Observable.of('ownEvent')
    
    const oneObservable = 
      pagesChanged
        .switchMap(pp => 
          Rx.Observable.from(pages)
            .mergeMap(x => x.events)
            .merge(_ownEvents)
        )
    
    oneObservable.subscribe(x => console.log('subscribe', x))
    
    console.log('adding 1')
    addPage(page1)
    console.log('adding 2')
    addPage(page2)
    console.log('removing 1')
    removePage(page1)
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>