Search code examples
system.reactivereactive-extensions-jsrxjs

Using one subject to propagate completly different event streams through


I need to proxy all the different event streams through one subject.

I came up with this code:

var mySubject,
    getObservable;

getObservable = function (subject, eventName) {
    return subject
        .asObservable()
        .filter(function (x) {
            return x.EventName === eventName;
        })
        .flatMap(function (x) {
            if (x.Type === 'onNext') {
                return Rx.Observable.return(x.Data);
            }

            if (x.Type === 'onError') {
                return Rx.Observable.throw(x.Data);
            }

            return Rx.Observable.empty();
        });
};

mySubject = new Rx.Subject();

getObservable(mySubject, 'foo')
    .subscribe(function(x){ 
        console.log('foo onNext ' + x); 
    }, function(x){ 
        console.log('foo onError ' + x); 
    }, function(){ 
        console.log('foo onComplete');
    });

getObservable(mySubject, 'bar')
    .subscribe(function(x){ 
        console.log('bar onNext ' + x); 
    }, function(x){ 
        console.log('bar onError ' + x); 
    }, function(){ 
        console.log('bar onComplete');
    });

mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});

mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});

Got output:

foo onNext 5

bar onNext 5
bar onError Error message

Expected output:

foo onNext 5  
foo onCompleted

bar onNext 5
bar onError Error message

For the bar event, that works like a charm: onNext will be propagated and as soon as the error raises the onError function gets called and the event stream finishes. However, I can't get it to work for the onComplete.

Whenever a complete notification raises I do see that Rx.Observable.empty() gets called but that doesn't cause the subscribers onComplete handler to be called. Instead, its calling its onNext handler.


Solution

  • The getObservable function returns an observable that subscribes to a eventName event sent through a subject.

    let getObservable = function (subject, eventName) {
        return Rx.Observable.create(function (observer) {
            subject
                .asObservable()
                .filter(function(x) {
                    return x.EventName === eventName;
                })
                .map(function(x) {
                    if (x.Type === 'onNext') {
                        observer.onNext(x.Data);
                    }
    
                    if (x.Type === 'onError') {
                        observer.onError(x.Data);
                    }
    
                    if (x.Type === 'onCompleted') {
                        observer.onCompleted();
                    }
    
                    return x;
                })
                .subscribe();
        });
    };
    

    This is a working example using data from the original question:

    var mySubject,
        getObservable;
    
    getObservable = function (subject, eventName) {
        return Rx.Observable.create(function (observer) {
            subject
                .asObservable()
                .filter(function(x) {
                    return x.EventName === eventName;
                })
                .map(function(x) {
                    if (x.Type === 'onNext') {
                        observer.onNext(x.Data);
                    }
    
                    if (x.Type === 'onError') {
                        observer.onError(x.Data);
                    }
                    
                    if (x.Type === 'onCompleted') {
                        observer.onCompleted();
                    }
                    
                    return x;
                })
                .subscribe();
        });
    };
    
    mySubject = new Rx.Subject();
    
    getObservable(mySubject, 'foo')
        .subscribe(function(x){ 
            console.log('SomethingHappened onNext ' + x); 
        }, function(x){ 
            console.log('SomethingHappened onError ' + x); 
        }, function(){ 
            console.log('SomethingHappened onComplete');
        });
    
    
    getObservable(mySubject, 'bar')
        .subscribe(function(x){ 
            console.log('DataUpdated onNext ' + x); 
        }, function(x){ 
            console.log('DataUpdated onError ' + x); 
        }, function(){ 
            console.log('DataUpdated onComplete');
        });
    
    mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
    mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});
    
    mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
    mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});
    <script src='https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js'></script>