Search code examples
typescriptrxjsangularfirerxjs-observables

Mapping an Observable in array items of another Observable, flattening the result


This title probably needs more explanation.

Basically what I get from the backend is an Observable with an array of racedrivers, and to each of the array items, I want to map another property isOnTrack, which consists of another Observable (simple boolean) I retrieve from the backend. I want to flatten the end result so I don't have an Observable within an Observable. I've tried many of the rxjs operators but I cannot get it to work.

Code that doesn't work:

this.drivers$ = this.db.list('users').valueChanges().pipe(
  map(arr => arr.map( (driver:any) => {
    driver.isOnTrack = this.db.object(`telemetry/${driver.uid}/values/IsOnTrack`).valueChanges();
     return driver
  })),
  mergeAll()
);

This successfully maps the isOnTrack observable to the array items but I can't get it flattened.

Project is on RxJS 6

Update 1

After Jonathan's answer I believe I should have used the word unpacked instead of flattening

The Observable after transformations that I would be looking for should deliver something similar to

of([
  {id: 1, name: 'foo', isOnTrack: true},
  {id: 2, name: 'bar', isOnTrack: true},
  {id: 3, name: 'baz', isOnTrack: false},
])

and after one IsOnTrack is changed in the backend it should emit the complete array again.

of([
  {id: 1, name: 'foo', isOnTrack: false},
  {id: 2, name: 'bar', isOnTrack: true},
  {id: 3, name: 'baz', isOnTrack: false},
])

Solution

  • Mock db functions

    // this.db.list('users').valueChanges()
    const requestIsOnTrack$ = (id: number): Observable<boolean> => interval(1000).pipe(
      take(3),
      map(() => Math.random() >= 0.5)
    )
    
    // this.db.object(`telemetry/${driver.uid}/values/IsOnTrack`).valueChanges()
    const requestDrivers$ = () => of([
      {id: 1, name: 'foo'},
      {id: 2, name: 'bar'},
      {id: 3, name: 'baz'},
    ])
    

    Implementation

    const drivers$ = requestDrivers$().pipe(
      map(drivers => drivers.map(driver => requestIsOnTrack$(driver.id).pipe(
        take(1),
        map(isOnTrack => ({
          ...driver,
          isOnTrack
        }))
      ))),
      mergeAll(),
      combineAll()
    )
    

    Explanation

    The object type living in the observables is just <T> for the ease of not using interfaces

    • request all drivers from your db: requestDrivers$() => Observable
    • map the request isOnTrack to each driver requestIsOnTrack$(id) => Observable<Observable<T>[]>
    • limit your requestOnTrack updates to 1 by using take(1) => Observable<Observable<T>[]>
    • map the previous values into the new object ({...driver, isOnTrack}) => Observable<Observable<T>[]>
    • mergeAll to split up your array into several emits mergeAll() => Observable<Observable<T>>
    • combineAll to unbox the observables and bind all mapped values to an array combineAll() => Observable<T[]>

    Here is a running stackblitz