Search code examples
typescriptrxjsconcatenationangular-httpclientfork-join

RxJS and Angular HttpClient: use of forkJoin for multiple sequential requests


In RxJS (ES6) I'm trying to get the result in a sequentially series of operations in a single Observable.

I'm not sure If I have to use a forkJoin (but I would like the operations to be executed sequentially) or a concat operator (but I would like to just be notified at the end when all of them are executed).

I TRIED:

forkJoin

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return forkJoin(batch);
        })
    );
  }

concat

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return concat(batch);
        })
    );
  }

In both cases I can't ever see the observables coming from the HttpClient doing their job (no http request sent), but I can see the logging.

The called method in the batch it's the following:

  updateProduct(product: Product) {
      console.log('calling...');
      const options = this.getDefaultRequestOptions();
      return this.http.post('update_product', product, options);
  }

I call the sync() function like the following:

this.productService.sync().subscribe(() => {
  console.log('sync done');
}, error => this.utils.handleError(error));

Output:

(8) calling...
sync done

but no HTTP request starting.

If I do the same outside the forkJoin/concat in the pipe map I can see the HTTP request sent.

sync(): Observable<any> {

    const product = new Product(null, 'Title', 'Desc.', 'CODE');
    return this.api.updateProduct(product);

}

What am I missing?

--- UPDATE - SOLUTION ---

sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        flatMap(products => {

            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            console.log(batch);

            return concat(...batch);
            // return forkJoin(batch);
        })
    );

Solution

  • Try

    sync(): Observable<any> {
      return from(this.db.getProducts()).pipe(
        map(products => {
          const batch: Observable<any>[] = [];
          for (const product of products) {
            batch.push(this.api.updateProduct(product));
          }
         return batch;
       }),
       switchMap(batch => forkJoin(batch)),
      )
    }
    

    This is assuming this.db.getProducts() is synchronous static data and not an observable/asynchronous data.

    Then you can try doing

      this.productService.sync().subscribe(() => { console.log('sync done'); });
    

    And see if any API calls are being made.