I'm trying to create an observable using RxJS that does what is pictured.
This should do the trick.
var Rx = require('rx'),
source = Rx.Observable.interval(10).take(100),
log = console.log.bind(console);
Rx.Observable.create(function (observer) {
var delaying = false,
hasValue = false,
complete = false,
value;
function onNext (x) {
value = x;
if (delaying) {
hasValue = true;
} else {
sendValue();
}
}
function sendValue () {
observer.onNext(value);
if (complete) {
observer.onCompleted();
} else {
setTimeout(callback, 1000); // exercise for the reader. Use a scheduler.
}
delaying = true;
}
function callback () {
if (hasValue) {
hasValue = false;
sendValue();
} else {
delaying = false;
}
}
return source.subscribe(
onNext,
observer.onError.bind(observer),
function () {
if (hasValue) {
complete = true;
} else {
observer.onCompleted();
}
}
);
})
.subscribe(log);