Search code examples
angularrxjsobservablerxjs-observables

How to chain observables like using Promises or async/await?


Let's imagine I have this code using async/await that's working very well:

async function getMeSomething() : Promise<Something> {
  // some async code
}

async function getMeStuff() : Promise<Stuff> {
  // some async code
}

async function getMeSomethingWithStuff: Promise<SomethingWithStuff> {
  const something: Something = await getMeSomething();
  const stuff: Stuff = await getMeStuff();
  return new SomethingWithStuff(something, stuff);
}

This code could effectively be written with promises, like any code using async/await (it's just longer):

// this is strictly equivalent to previous example, including exception management
function getMeSomethingWithStuff: Promise<SomethingWithStuff> {
  let something;
  return getMeSomething().then((res) => {
    something = res;
    return getMeStuff();
  }).then((stuff) => {
    return new SomethingWithStuff(something, stuff);
  });
}

When calling the third function it will call the first one and second one, combine the results and return me the final result in a way that the caller of the third function has no idea which underlying functions were called:

const somethingWithStuff: SomethingWithStuff = getMeSomethingWithStuff();
// we have the result, we don't care how we got it, the logic is completely encapsulated

How could we write an equivalent code using only rxjs ?

// the following two functions are already defined and we can't redefine them
function getMeSomething() : Observable<Something> {
  // some async code
}

function getMeStuff() : Observable<Stuff> {
  // some async code
}

// we want to define this function that should return an observable
function getMeSomethingWithStuff: Observable<SomethingWithStuff> {
  // what's the code ?
}

The rules:

  • No cheating with promises, async/await or conversion between observables and promises, it must only use rxjs
  • Exceptions must be correctly handled

It may seem a trivial question but I couldn't find an answer by myself despite reading carefully the documentation of rxjs and a lot of tutorials.


Solution

  • You'd need to use function like forkJoin to trigger multiple observables in parallel and one of the higher order mapping operators like switchMap to map from one observable to another.

    See here for brief descriptions of forkJoin function and switchMap operator.

    Option 1: Parellel

    // the following two functions are already defined and we can't redefine them
    function getMeSomething() : Observable<Something> {
      // some async code
    }
    
    function getMeStuff() : Observable<Stuff> {
      // some async code
    }
    
    function getMeSomethingWithStuff: Observable<SomethingWithStuff> {
      return forkJoin({
        something: getMeSomething(),
        stuff: getMeStuff()
      }).pipe(
        switchMap((res: any) => 
          new SomethingWithStuff(res.something, res.stuff)
        )
      );
    }
    

    Option 2: Serial

    Use multiple higher order mapping operators

    function getMeSomethingWithStuff: Observable<SomethingWithStuff> {
      return getMeSomething().pipe(
        switchMap(something => getMeStuff().pipe(
          map(stuff => ({ something, stuff })
        ),
        switchMap((res: any) => 
          new SomethingWithStuff(res.something, res.stuff)
        )
      );
    }
    

    Update: add serial observable trigger