I need to proxy all the different event streams through one subject.
I came up with this code:
var mySubject,
getObservable;
getObservable = function (subject, eventName) {
return subject
.asObservable()
.filter(function (x) {
return x.EventName === eventName;
})
.flatMap(function (x) {
if (x.Type === 'onNext') {
return Rx.Observable.return(x.Data);
}
if (x.Type === 'onError') {
return Rx.Observable.throw(x.Data);
}
return Rx.Observable.empty();
});
};
mySubject = new Rx.Subject();
getObservable(mySubject, 'foo')
.subscribe(function(x){
console.log('foo onNext ' + x);
}, function(x){
console.log('foo onError ' + x);
}, function(){
console.log('foo onComplete');
});
getObservable(mySubject, 'bar')
.subscribe(function(x){
console.log('bar onNext ' + x);
}, function(x){
console.log('bar onError ' + x);
}, function(){
console.log('bar onComplete');
});
mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});
mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});
Got output:
foo onNext 5
bar onNext 5
bar onError Error message
Expected output:
foo onNext 5
foo onCompleted
bar onNext 5
bar onError Error message
For the bar
event, that works like a charm: onNext
will be propagated and as soon as the error raises the onError
function gets called and the event stream finishes. However, I can't get it to work for the onComplete
.
Whenever a complete notification raises I do see that Rx.Observable.empty()
gets called but that doesn't cause the subscribers onComplete
handler to be called. Instead, its calling its onNext
handler.
The getObservable
function returns an observable that subscribes to a eventName
event sent through a subject
.
let getObservable = function (subject, eventName) {
return Rx.Observable.create(function (observer) {
subject
.asObservable()
.filter(function(x) {
return x.EventName === eventName;
})
.map(function(x) {
if (x.Type === 'onNext') {
observer.onNext(x.Data);
}
if (x.Type === 'onError') {
observer.onError(x.Data);
}
if (x.Type === 'onCompleted') {
observer.onCompleted();
}
return x;
})
.subscribe();
});
};
This is a working example using data from the original question:
var mySubject,
getObservable;
getObservable = function (subject, eventName) {
return Rx.Observable.create(function (observer) {
subject
.asObservable()
.filter(function(x) {
return x.EventName === eventName;
})
.map(function(x) {
if (x.Type === 'onNext') {
observer.onNext(x.Data);
}
if (x.Type === 'onError') {
observer.onError(x.Data);
}
if (x.Type === 'onCompleted') {
observer.onCompleted();
}
return x;
})
.subscribe();
});
};
mySubject = new Rx.Subject();
getObservable(mySubject, 'foo')
.subscribe(function(x){
console.log('SomethingHappened onNext ' + x);
}, function(x){
console.log('SomethingHappened onError ' + x);
}, function(){
console.log('SomethingHappened onComplete');
});
getObservable(mySubject, 'bar')
.subscribe(function(x){
console.log('DataUpdated onNext ' + x);
}, function(x){
console.log('DataUpdated onError ' + x);
}, function(){
console.log('DataUpdated onComplete');
});
mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});
mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js'></script>