I have some code that performs uploads of a large file in smaller chunks. Here is what I have:
upload(project: Project): Observable<any> {
var chunks = this.chunkFiles(project);
var totalPercent = project.file
.map(file => Math.ceil(file.size / this.chunkSize))
.reduce((sum, curr) => sum + curr) * 100;
var finishedPercent = 0;
var currentPercent = 0;
return from(chunks)
.pipe(
concatMap(request =>
{
return this.http.request(request)
.pipe(
retryWhen(error =>
{
return interval(1000)
.pipe(
flatMap(count =>
{
if (count == 2)
{
return throwError(error);
}
return of(count);
})
);
}));
}),
map(event =>
{
if (event.type === HttpEventType.UploadProgress)
{
var progress = event as HttpProgressEvent;
currentPercent = Math.round(100 * progress.loaded / progress.total);
return {
type: event.type,
loaded: finishedPercent + currentPercent,
total: totalPercent
};
}
else if (event instanceof HttpResponse)
{
finishedPercent += 100;
if (finishedPercent == totalPercent)
{
return event;
}
}
}),
filter(response => response !== undefined)
);
}
The intention is that it starts uploading chunks of the file and once one of the chunk uploads fails, it should stop and the error should propagate up to the code that is calling my upload function. That particular code is as follows:
onStartUpload = this.actions$
.ofType(forRoot.START_UPLOAD).pipe(
switchMap((action: forRoot.StartUpload) =>
this.uploadService
.upload(action.project).pipe(
map((event) => {
if (event.type === HttpEventType.UploadProgress) {
const progress = event as HttpProgressEvent;
const percentDone = Math.round(100 * progress.loaded / progress.total);
return new forRoot.UploadProgress(percentDone);
} else if (event instanceof HttpResponse) {
return new forRoot.UploadSucceeded();
} else {
console.log(event);
}
}),
filter(a => a !== undefined),
catchError(error => {
if (error instanceof HttpErrorResponse && error.status === 400) {
const message = this.getMessage(error);
return of(new forRoot.UploadFailed(message || "Bad request"));
} else {
return of(new forRoot.UploadFailed("Network error"));
}
})))
);
The problem is that I only get a "Network error" message, whereas I'm expecting something with error status 400 (so I should be getting either the specific message or "Bad request".
I'm assuming I'm not properly handling the error within the retryWhen, but I've tried a few different ideas and none seem to work. What am I missing?
Edit to show working code without retry:
Here is code that has correct error message handling, however it has no retry:
return from(chunks)
.pipe(
concatMap(request =>
{
return this.http.request(request)
}),
map(event =>
{
if (event.type === HttpEventType.UploadProgress)
{
var progress = event as HttpProgressEvent;
currentPercent = Math.round(100 * progress.loaded / progress.total);
return {
type: event.type,
loaded: finishedPercent + currentPercent,
total: totalPercent
};
}
else if (event instanceof HttpResponse)
{
finishedPercent += 100;
if (finishedPercent == totalPercent)
{
return event;
}
}
}),
filter(response => response !== undefined)
);
I've been assuming there is something I can add to the http.request call to retry a specified number of times with a delay in between retries, and then if they all fail, throw an error just like http.request does. What seems to happen is that it proceeds past the concatMap code to the map code. I can add console.log(event) calls and see the error message returned from the server. I'm still pretty new at rxjs, so maybe I don't understand, but I was expecting once the error happened that it wouldn't execute the map code.
The issue is here:
retryWhen(error => { // << error is a stream
return interval(1000)
.pipe(
flatMap(count => {
if (count == 2){
return throwError(error); // so here a stream is thrown
}
return of(count);
})
);
})
error
argument is not a single error, but actually an Observable of all errors happening.
To fix this you should return observable derived from error
, e.g.:
retryWhen(errors$ =>
errors$.pipe(
delay(1000)
)
)
Updated example of limited error handling
retryWhen(errors =>
errors.pipe(
// switchMap to add limiting logic
switchMap((error, index) =>
index == 3 // if it is a 4th error
? throwError(error) // -- we rethrow it upstream
: of(error).pipe( delay(1000) ) // -- delay a retry otherwise
)
)
)
Heres a retryWhen example with exponential backoff.
And an article on error handling in rxjs (with a note about retryWhen
early completion).
Hope this helps