Search code examples
javascriptrxjses6-promise

Resolving multiple promises inside an observable not working


I'm using Firebase Storage and I'm trying to load all assets via a function call. The only way to get an assets url is to call getDownloadURL which returns a promise. I need to call this for every asset but I can't make it wait for all promises to be done before continuing for some reason.

I thought returning a promise from mergeMap would make it wait for all of them but that doesn't seem to be the case.

I've look at a number of questions regarding promises and RXJS but I can't seem to figure out what's wrong with the code.

getAssets() {

    return this.authService.user$.pipe(
      first(),
      switchMap(user => defer(() => from(this.afs.storage.ref(`${user.uid}/assets`).listAll()))),
      switchMap(assets => from(assets.items).pipe(
        mergeMap(async (asset) => {
        
          return new Promise((res, rej) => {
          
            asset.getDownloadURL().then(url => {
              
              const _asset = {
                name: asset.name,
                url,
              };
  
              this.assets.push(_asset);

              res(_asset);
            })
            .catch((e) => rej(e));
          });
        }),
      )),
      map(() => this.assets),
    );
  }

  ...

  this.getAssets().subscribe(assets => console.log(assets)); // this runs before all asset's url has been resolved

Solution

  • Overview

    mergeMap doesn't wait for all internal observables. It spins up n internal observable pipes that run in parallel, and spits all the values out the same coupling at the bottom of the pipe (your subscribe statement in this case) as individual emissions. Hence why this.getAssets().subscribe(assets => console.log(assets)) runs before all your parallel internal mergeMap pipes complete their individual computations, because mergeMap doesn't wait for all of them before emitting (it will emit one by one as they finish). If you want to wait for n observable pipes to finish, then you need to use forkJoin.


    Fork Join

    forkJoin is best used when you have a group of observables and only care about the final emitted value of each. One common use case for this is if you wish to issue multiple requests on page load (or some other event) and only want to take action when a response has been received for all. In this way it is similar to how you might use Promise.all.


    Solution

    getAssets(): Observable<Asset[]> {
      return this.authService.user$.pipe(
        // first() will deliver an EmptyError to the observer's error callback if the
        // observable completes before any next notification was sent. If you don't
        // want this behavior, use take(1) instead.
        first(),
        // Switch to users firebase asset stream.
        switchMap(user => {
          // You might have to tweak this part. I'm not exactly sure what 
          // listAll() returns. I guessed that it returns a promise with
          // firebase asset metadata.
          return from(this.afs.storage.ref(`${user.uid}/assets`).listAll());
        }),
        // Map to objects that contain method to get image url.
        map(firebaseAssetMetadata => firebaseAssetMetadata?.items ?? []),
        // Switch to parallel getDownloadUrl streams.
        switchMap(assets => {
          // Not an rxjs map, a regular list map. Returns a list of getAssetUrlPipes.
          const parallelGetAssetUrlPipes = assets.map(asset => {
            return from(asset.getDownloadUrl()).pipe(
              map(url => { name: asset.name, url })
            );
          });
          // 1) Listen to all parallel pipes.
          // 2) Wait until they've all completed.
          // 3) Merge all parallel data into a list.
          // 4) Then move list down the pipe.
          return forkJoin(parallelGetAssetUrlPipes);
        }),
        // Outputs all parallel pipe data as a single emission in list form.
        // Set local variable to users asset data.
        tap(assetObjects => this.assets = assetObjects)
      );
    }
    
    // Outputs the list of user asset data.
    this.getAssets().subscribe(console.log);
    

    Good luck out there, and enjoy your Swedish meatballs!