Search code examples
angularrxjsrxjs-observables

Is this a correct way of using forkJoin and mergeMap?


I need to make all requests in parallel whenever it's possible. Here is the simplified version of my code:

const arr = [
  'https://jsonplaceholder.typicode.com/users/1',
  'https://jsonplaceholder.typicode.com/users/2',
  'https://jsonplaceholder.typicode.com/users/3',
];

let observables = [];
for (let url of arr) {
  observables.push(this.http.get(url));
}

const obs$ = this.http.get('http://localhost:3000/users').pipe(
  map((data: any) => {
    //an array of urls like https://jsonplaceholder.typicode.com/users/5
    let urls = data.data;
    let observables = [];
    for (let url of urls) {
      observables.push(this.http.get(url));
    }
    return observables;
  }),
  mergeMap((data) => forkJoin([...data]))
);

forkJoin([...observables, obs$]).subscribe({
  next: (data) => console.log(data, 'from fork'),
  error:(err)=>console.log(err)
});

So, I have in this case:

  1. An array of observables observables
  2. An observable obs$ that fetches some urls, and upon fetching the urls, I need to make requests to those urls in parallel as well.

I don't care about results, I only care if something errors out. These example code seems to be working properly, however, I've got a couple of questions since I'm trying to wrap my head around RxJS.

  1. Is this a proper way of doing things in order to achieve goals I described above?
  2. mergeMap((data) => forkJoin([...data])) -> mergeMap here basically only helps me getting an Observable out of the nested Observable, right? So I could technically use concatMap or switchMap with the same effect?

Solution

  • You actually don't need forkJoin, you can simply create an observable that emits individual urls, then use mergeMap to make the request for that url and emit the results.

    Since you have two different url sources, we can define those separately, then just use merge to create a single observable from both sources:

    const urls_1$: Observable<string> = from(arr);
    
    const urls_2$: Observable<string> = this.http.get('http://localhost:3000/users').pipe(
      mergeMap(response => response.data)
    );
    
    const response$ = merge(urls_1$, urls_2$).pipe(
      mergeMap(url => this.http.get(url))
    ).
    
    response$.subscribe({
      next: data => console.log(data),
      error: err => console.log(err)
    });
    
    • urls_1$: we use from to create observable that emits the array elements individually

    • urls_2$: we make http call then use mergeMap to emit the received array elements individually. This has the same effect as from above (in fact mergeMap used from internally). You are correct, that you could use any of the higher order operators here.

    • responses$: here we use merge to create a single observable from both of our sources. This means whenever url_1$ or url_2$ emit a value, the "merge observable" will emit it. We then use mergeMap to make the http call an emit its result. Unlike the previous mergeMap, this one cannot be substituted by switchMap or concatMap because you want the requests to be made in parallel. The result here is that our response$ observable will emit the response of each request.

      • if you wanted to limit the number of concurrent requests, you can provide the second concurrency parameter to mergeMap:
    mergeMap(url => this.http.get(url), 5) // <-- limit to 5 active requests