Search code examples
reactjsrxjssubscriptionredux-observable

React Actions using RxJS for parallel async calls


I am having an existing action that calls the api with list of IDs as querystring parameter and fetches the thumbnail response for the passed IDs. In some cases, count of IDs can be 150-200 records. Hence, I am working on making this APIs call batch based and run in parallel using forkJoin and subscribe methods of RxJs. I'm facing following 3 issues regarding the implementation.

  1. I am able to get the batch based response in next method of subscribe. However, the action invoked in this function along with response is not getting invoked.

  2. maxParallelQueries dont seems to be working as intended. All the fetchThumbnailObservables() are executed at once and not in the batches of 3 as set as constant/

  3. How to handle return of observable so that mergeMap do not give syntax error . Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput<any>'. Type 'void' is not assignable to type 'ObservableInput<any>'.ts(2345)

Here is my current code looks like. Any help would be great on this.

const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
    action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
        mergeMap( ({ payload }) => {
            
            const assetIds = Object.keys(payload);
            const meta = payload;

            const batchSize = 10;
            const maxParallelQueries = 3;
            
            const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];

            for (let count = 0; count < assetIds.length; count += batchSize) {
                fetchThumbnailObservables.push(AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize)));
            }

             forkJoin(fetchThumbnailObservables)
            .pipe(mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, maxParallelQueries)) // Issue 1 :  maxParallelQueries limit is not working
            .subscribe({ 
                complete: () => { }, 
                error: () => { },
                next: (response) => { 
                     // Issue 2 :  below action dont seems to be getting invoked
                    actions.AssetActions.receivedThumbnailsAction(response, meta) 
                },     
             });
    
            // Issue 3 :  How to handle return of observable 
            // Added below code for prevennting error raised by mergeMap( ({ payload }) => {
            return new Observable<any>();
        }),
        catchError((error) => of(actions.Common.errorOccured({ error })))
    );
    ```

Solution

    1. I am able to get the batch based response in next method of subscribe. However, the action invoked in this function along with response is not getting invoked.

    You mustn't call an action imperatively in redux-observable. Instead you need to queue them. actions.AssetActions.receivedThumbnailsAction(response, meta) would return an plain object of the shape {type: "MY_ACTION", payload: ...} instead of dispatching an action. You rather want to return that object wrapped inside an observable inside mergeMap in order to add the next action to the queue. (see solution below)

    1. maxParallelQueries dont seems to be working as intended. All the fetchThumbnailObservables() are executed at once and not in the batches of 3 as set as constant/

    First guess: What is the return type of AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize))? If it is of type Promise then maxParallelQueries has no effect because promises are eager and start immediately when created thus before mergeMap has the chance to control the request. So make sure that fetchThumbnailData returns an observable. Use defer rather than from if you need to convert a Promise to an Observable to make sure that a promise is not being fired prematurely inside fetchThumbnailData.

    Second guess: forkJoin is firing all the request so mergeMap doesn't even have the chance to restrict the number of parallel queries. I'm surprised that typescript doesn't warn you here about the incorrect return type in mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, ...). My solution below replaces forJoin with from and should enable your maxParallelQueries.

    1. How to handle return of observable so that mergeMap do not give syntax error . Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput'. Type 'void' is not assignable to type 'ObservableInput'.ts(2345)

    This is not a syntax error. In an epic you have either the option to return an empty observable return empty() if no subsequent actions are intended or dispatch another action by returning an observable of type IAction. Here my (untested) attempt to meet your requirements:

    const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
      action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
        mergeMap( ({ payload }) => {
                
          const assetIds = Object.keys(payload);
          const meta = payload;
    
          const batchSize = 10;
          const maxParallelQueries = 3;
                
          const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
    
          for (let count = 0; count < assetIds.length; count += batchSize) {
            fetchThumbnailObservables.push(
              AssetDataService.fetchThumbnailData(
                 assetIds.slice(count, count + batchSize)
              )
            );
          }
    
          return from(fetchThumbnailObservables)
            .pipe(
              mergeMap(
                fetchThumbnailObservable => fetchThumbnailObservable,
                maxParallelQueries
              ),
              map((response) =>
                // dispatch action for each response
                actions.AssetActions.receivedThumbnailsAction(response, meta)
              ),
              // catch error early so epic continue on error
              catchError((error) => of(actions.Common.errorOccured({ error })))
            );
        })
      );
    

    Please notice that I moved catchError closer to the request (inside the inner forkJoin Observable) so that the epic does not get terminated when an error occurs. Error Handling - redux observable This is so because when catchError is active it replaces the failed observable with the one that is returned by catchError. And I assume you don't want to replace the whole epic in case of an error?

    And one thing: redux-observable handles the subscriptions for you so won't need to call .subscribe or .unsubscribe anywhere in your application. You just need to care a about the lifecycle of observables. Most important to know here is when does an observable start, what happens to it when it completes, when does it complete or what happens when an observable throws.