I am having an existing action that calls the api with list of IDs as querystring parameter and fetches the thumbnail response for the passed IDs. In some cases, count of IDs can be 150-200 records. Hence, I am working on making this APIs call batch based and run in parallel using forkJoin
and subscribe
methods of RxJs. I'm facing following 3 issues regarding the implementation.
I am able to get the batch based response in next method of subscribe. However, the action invoked in this function along with response is not getting invoked.
maxParallelQueries dont seems to be working as intended. All the fetchThumbnailObservables() are executed at once and not in the batches of 3 as set as constant/
How to handle return of observable so that mergeMap do not give syntax error . Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput<any>'. Type 'void' is not assignable to type 'ObservableInput<any>'.ts(2345)
Here is my current code looks like. Any help would be great on this.
const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
mergeMap( ({ payload }) => {
const assetIds = Object.keys(payload);
const meta = payload;
const batchSize = 10;
const maxParallelQueries = 3;
const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
for (let count = 0; count < assetIds.length; count += batchSize) {
fetchThumbnailObservables.push(AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize)));
}
forkJoin(fetchThumbnailObservables)
.pipe(mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, maxParallelQueries)) // Issue 1 : maxParallelQueries limit is not working
.subscribe({
complete: () => { },
error: () => { },
next: (response) => {
// Issue 2 : below action dont seems to be getting invoked
actions.AssetActions.receivedThumbnailsAction(response, meta)
},
});
// Issue 3 : How to handle return of observable
// Added below code for prevennting error raised by mergeMap( ({ payload }) => {
return new Observable<any>();
}),
catchError((error) => of(actions.Common.errorOccured({ error })))
);
```
- I am able to get the batch based response in next method of subscribe. However, the action invoked in this function along with response is not getting invoked.
You mustn't call an action imperatively in redux-observable. Instead you need to queue them.
actions.AssetActions.receivedThumbnailsAction(response, meta)
would return an plain object of the shape {type: "MY_ACTION", payload: ...}
instead of dispatching an action. You rather want to return that object wrapped inside an observable inside mergeMap
in order to add the next action to the queue. (see solution below)
- maxParallelQueries dont seems to be working as intended. All the fetchThumbnailObservables() are executed at once and not in the batches of 3 as set as constant/
First guess: What is the return type of AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize))
? If it is of type Promise
then maxParallelQueries
has no effect because promises are eager and start immediately when created thus before mergeMap
has the chance to control the request. So make sure that fetchThumbnailData
returns an observable. Use defer
rather than from
if you need to convert a Promise to an Observable to make sure that a promise is not being fired prematurely inside fetchThumbnailData
.
Second guess: forkJoin
is firing all the request so mergeMap
doesn't even have the chance to restrict the number of parallel queries. I'm surprised that typescript doesn't warn you here about the incorrect return type in mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, ...)
. My solution below replaces forJoin
with from
and should enable your maxParallelQueries
.
- How to handle return of observable so that mergeMap do not give syntax error . Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput'. Type 'void' is not assignable to type 'ObservableInput'.ts(2345)
This is not a syntax error. In an epic you have either the option to return an empty observable return empty()
if no subsequent actions are intended or dispatch another action by returning an observable of type IAction
. Here my (untested) attempt to meet your requirements:
const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
mergeMap( ({ payload }) => {
const assetIds = Object.keys(payload);
const meta = payload;
const batchSize = 10;
const maxParallelQueries = 3;
const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
for (let count = 0; count < assetIds.length; count += batchSize) {
fetchThumbnailObservables.push(
AssetDataService.fetchThumbnailData(
assetIds.slice(count, count + batchSize)
)
);
}
return from(fetchThumbnailObservables)
.pipe(
mergeMap(
fetchThumbnailObservable => fetchThumbnailObservable,
maxParallelQueries
),
map((response) =>
// dispatch action for each response
actions.AssetActions.receivedThumbnailsAction(response, meta)
),
// catch error early so epic continue on error
catchError((error) => of(actions.Common.errorOccured({ error })))
);
})
);
Please notice that I moved catchError
closer to the request (inside the inner forkJoin Observable) so that the epic does not get terminated when an error occurs. Error Handling - redux observable This is so because when catchError
is active it replaces the failed observable with the one that is returned by catchError
. And I assume you don't want to replace the whole epic in case of an error?
And one thing: redux-observable handles the subscriptions for you so won't need to call .subscribe
or .unsubscribe
anywhere in your application. You just need to care a about the lifecycle of observables. Most important to know here is when does an observable start, what happens to it when it completes, when does it complete or what happens when an observable throws.