Search code examples
javascriptobservablefrprxjs

Chaining promises with RxJS


I'm new to RxJS and FRP in general. I had the idea of converting an existing promise chain in my ExpressJS application to be an observable for practice. I am aware that this probably isn't the best example but maybe someone can help shed some light.

What I'm trying to do:

  1. I have two promises - prom1 and prom2
  2. I want prom1 to run before prom2
  3. If prom1 sends a reject(err), I want to cancel prom2 before it starts.
  4. I want the error message prom1 returns to be available to the onError method on the observer.

var prom1 = new Promise(function(resolve, reject) {
    if (true) {
       reject('reason');
    }
    resolve(true);
});

var prom2 = new Promise(function(resolve, reject) {
    resolve(true);
});

// What do I do here? This is what I've tried so far...
var source1 = Rx.Observable.fromPromise(prom1);
var source2 = source1.flatMap(Rx.Observable.fromPromise(prom2));

var subscription = source2.subscribe(
    function (result) { console.log('Next: ' + result); },

    // I want my error 'reason' to be made available here
    function (err) { console.log('Error: ' + err); },

    function () { console.log('Completed'); });

Solution

  • If I understood what you are trying to do - you need to create two deferred observables from functions that return promises and concat them:

    var shouldFail = false;
    
    function action1() {
        return new Promise(function (resolve, reject) {    
            console.log('start action1');
            if (shouldFail) {
                reject('reason');
            }
            resolve(true);
        });
    }
    
    function action2() {
        return new Promise(function (resolve, reject) {    
            console.log('start action2');
            resolve(true);
        });
    }
    
    var source1 = Rx.Observable.defer(action1);
    var source2 = Rx.Observable.defer(action2);
    
    var combination = Rx.Observable.concat(source1, source2);
    
    var logObserver = Rx.Observer.create(
    
    function (result) {
        console.log('Next: ' + result);
    },
    
    function (err) {
        console.log('Error: ' + err);
    },
    
    function () {
        console.log('Completed');
    });
    

    then for normal case:

    combination.subscribe(logObserver);
    // start action1
    // Next: true
    // start action2
    // Next: true
    // Completed
    

    And case where fisrt promise fails:

    shouldFail = true;
    combination.subscribe(logObserver);
    // start action1
    // Error: reason
    

    http://jsfiddle.net/cL37tgva/