Search code examples
javascriptrxjsrxjs5reactivex

Collecting the currently incomplete observables in a higher-order rxjs observable


For example, I have an observable that emits a 10-second timer every 5 seconds. I can use scan to create an observable that emits an array of all the inner observables emitted so far:

tick = 5000
tock = 1000
timers = Observable.interval(tick).scan( (acc, next) => {
    let timer = Observable.interval(tock).take(10);
    return acc.concat([timer]);
}, []);

But what if I want to emit it to emit an array of all "live" timers (i.e. those that haven't called complete yet)?

Here's a crappy ascii marble diagram of what I'm thinking of:

-A--B-----
  \  \
   \  \
    \  x
     x
 |  |    |
[A][A,B] []

Is there a way to do this using the standard operators, or is this just not an idiomatic thing to try to do with rxjs?


Solution

  • You should be able to use the "active" method described in this answer to achieve what you want:

    Rx: a zip-like operator that continues after one of the streams ended?