Search code examples
rxjsrx-javareactive-programmingrx-java2rx-java3

Build "Heartbeat" Observable from Unpredictable Source Observable


I have an Observable, source, that may emit items at unpredictable times. I'm trying to use it to build another Observable that reliably emits its values every 500ms.

Let's say that source emits values at these times:

  • 100ms - first item
  • 980ms - second item
  • 1020ms - third item
  • 1300ms - fourth item, etc.

I'd like to "smooth" this stream, so that I get outputs like:

  • 500ms - first item
  • 1000ms - second item
  • 1500ms - third item
  • 2000ms - fourth item

A naive approach might be to just add a delay in between emissions of source items. But, that won't create evenly spaced intervals, like I want.

I've tried various combinations of .timer(), .interval(), and .flatMap(), but nothing promising, yet.


Solution

  • For a source emitting faster than your interval

    zip your source with an interval of the required time span.

    zip(source, interval(500)).pipe(
      map(([value, _]) => value)  // only emit the source value
    )
    

    enter image description here

    zip emits the 1st item from source with the 1st item from interval, then the 2nd item from source with the 2nd item from interval and so on. If the output observable should only emit when interval emits, the Nth value from source has to arrive before the Nth value from interval.

    Potential Problem: If your source emits slower than interval at some point (i.e. the Nth value from source arrives after the Nth value from interval) then zip will emit directly without waiting for the next time interval emits.

    // the 5th/6th value from source arrive after the 5th/6th value from interval
                                                  v    v
    source:       -1--------2-3---4---------------5----6-----
    interval:     -----1-----2-----3-----4-----5-----6-----7-
    zip output:   -----1-----2-----3-----4--------5----6-----
                       ✓     ✓     ✓     ✓        ⚠️    ⚠️
    // emits 5 and 6 don't happen when interval emits
    

    For a source emitting at any rate

    function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
      return (source: Observable<T>) =>
        defer(() => {
          let sourceCompleted = false;
          const queue = source.pipe(
            tap({ complete: () => (sourceCompleted = true) }),
            scan((acc, curr) => (acc.push(curr), acc), []) // collect all values in a buffer
          );
          return interval(period).pipe(
            withLatestFrom(queue), // combine with the latest buffer
            takeWhile(([_, buffer]) => !sourceCompleted || buffer.length > 0), // complete when the source completed and the buffer is empty
            filter(([_, buffer]) => buffer.length > 0), // only emit if there is at least on value in the buffer
            map(([_, buffer]) => buffer.shift()) // take the first value from the buffer
          );
        });
    }
    
    source.pipe(
      emitOnInterval(500)
    )
    
    // the 5th/6th value from source arrive after the 5th/6th value from interval
                                                  v    v
    source:       -1--------2-3---4---------------5----6-----
    interval:     -----1-----2-----3-----4-----5-----6-----7-
    output:       -----1-----2-----3-----4-----------5-----6-
                       ✓     ✓     ✓     ✓           ✓     ✓   
    // all output emits happen when interval emits
    

    https://stackblitz.com/edit/rxjs-qdlktm?file=index.ts