Search code examples
observablereactive-extensions-jsrxjs

Create a Observable that delays the next value


I'm trying to create an observable using RxJS that does what is pictured.

Expected observable mapping

  • Grabs a value and waits a fixed period of time before getting the next one.
  • The next one will be the last value emitted in the period of the wait, skipping the rest.
  • If an wait interval goes by where no value was emitted, the next one should be grabbed immediately as the last example of the image depicts.

Solution

  • This should do the trick.

    var Rx      = require('rx'),
        source  = Rx.Observable.interval(10).take(100),
        log     = console.log.bind(console);
    
    Rx.Observable.create(function (observer) {
    
        var delaying = false,
            hasValue = false,
            complete = false,
            value;
    
        function onNext (x) {
          value = x;
          if (delaying) {
            hasValue = true;
          } else {
            sendValue();
          }
        }
    
        function sendValue () {
          observer.onNext(value);
          if (complete) {
            observer.onCompleted();
          } else {
            setTimeout(callback, 1000); // exercise for the reader. Use a scheduler.
          }
          delaying = true;
        }
    
        function callback () {
          if (hasValue) {
            hasValue = false;
            sendValue();
          } else {
            delaying = false;
          }
        }
    
        return source.subscribe(
            onNext,
            observer.onError.bind(observer),
            function () {
              if (hasValue) {
                complete = true;
              } else {
                observer.onCompleted();
              }
            }
          );
      })
      .subscribe(log);