I am doing some parallel HTTP get with RxJs pipe and the mergeMap operator.
On the first request fail (let's imagine /urlnotexists throw a 404 error) it stops all other requests.
I want it to continue query all remaining urls without calling all remaining mergeMap for this failed request.
I tried to play with throwError, and catchError from RxJs but without success.
index.js
const { from } = require('rxjs');
const { mergeMap, scan } = require('rxjs/operators');
const request = {
get: url => {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (url === '/urlnotexists') { return reject(new Error(url)); }
return resolve(url);
}, 1000);
});
}
};
(async function() {
await from([
'/urlexists',
'/urlnotexists',
'/urlexists2',
'/urlexists3',
])
.pipe(
mergeMap(async url => {
try {
console.log('mergeMap 1:', url);
const val = await request.get(url);
return val;
} catch(err) {
console.log('err:', err.message);
// a throw here prevent all remaining request.get() to be tried
}
}),
mergeMap(async val => {
// should not pass here if previous request.get() failed
console.log('mergeMap 2:', val);
return val;
}),
scan((acc, val) => {
// should not pass here if previous request.get() failed
acc.push(val);
return acc;
}, []),
)
.toPromise()
.then(merged => {
// should have merged /urlexists, /urlexists2 and /urlexists3
// even if /urlnotexists failed
console.log('merged:', merged);
})
.catch(err => {
console.log('catched err:', err);
});
})();
$ node index.js
mergeMap 1: /urlexists
mergeMap 1: /urlnotexists
mergeMap 1: /urlexists2
mergeMap 1: /urlexists3
err: /urlnotexists
mergeMap 2: /urlexists
mergeMap 2: undefined <- I didn't wanted this mergeMap to have been called
mergeMap 2: /urlexists2
mergeMap 2: /urlexists3
merged: [ '/urlexists', undefined, '/urlexists2', '/urlexists3' ]
I expect to make concurrent GET requests and reduce their respectives values in one object at the end.
But if some error occurs I want them not to interrupt my pipe, but to log them.
Any advice ?
If you want to use RxJS you should add error handling with catchError
and any additional tasks to a single request before you execute all your requests concurrently with forkJoin
.
const { of, from, forkJoin } = rxjs;
const { catchError, tap } = rxjs.operators;
// your promise factory, unchanged (just shorter)
const request = {
get: url => {
return new Promise((resolve, reject) => setTimeout(
() => url === '/urlnotexists' ? reject(new Error(url)) : resolve(url), 1000
));
}
};
// a single rxjs request with error handling
const fetch$ = url => {
console.log('before:', url);
return from(request.get(url)).pipe(
// add any additional operator that should be executed for each request here
tap(val => console.log('after:', val)),
catchError(error => {
console.log('err:', error.message);
return of(undefined);
})
);
};
// concurrently executed rxjs requests
forkJoin(["/urlexists", "/urlnotexists", "/urlexists2", "/urlexists3"].map(fetch$))
.subscribe(merged => console.log("merged:", merged));
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>