I have an observable that wraps a socket.io stream of events from the server (call it source
). Each message from socket.io is emitted on the observable. This observable is filtered and mapped to several subscriptions based on the content of the socket.io messages.
For example:
var filtered = source.filter(function(msg) { return msg.name === 'EventName' });
filtered.subscribe(function handeEventName(msg) { /* do something with msg */ });
Sometimes these subscriptions trigger long animations in my app. I want to pause the source observable when this happens, buffer new events until the animation to plays out, and then resume the observable.
I've got all of this working as expected using pausableBuffered
:
var pausable = source.pausableBuffered();
var filtered = pausable.filter(function(msg) { return msg.name === 'EventName' });
filtered.subscribe(function handeEventName(msg) {
pausable.pause();
/**
* do something async, like animation, then when done call
* pausable.resume();
*/
});
So far so good.
However, let's assume that while the observable is paused, five messages are buffered. The third message is one that needs to pause the stream again. It has a subscription set up to do so. However, as soon as the source observable is un-paused, it immediately empties it's buffer of all five events, all of which get handled and passed to all five subscriptions, at which point the third message's subscription finally pauses the original stream.
I understand why this is happening, but what I really want to have happen instead is:
It seems that every way that I use pausableBuffered
ends up dumping the entire buffer to all of their subscriptions. How can I achieve what I'm looking for?
You could try a controlled
observable. Gives you pretty much complete control.
For example:
var source = Rx.Observable.interval(300).take(10);
var controlled = source.controlled();
var sourceSub = source.subscribe(
function (x) {
console.log('Next source: ' + x.toString());
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
var controlledSub = controlled.subscribe(
function (x) {
console.log('Next controlled: ' + x.toString());
if (x === 3) {
setTimeout(function(){
controlled.request(1)
}, 2000)
} else {
controlled.request(1);
}
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
controlled.request(1);