I have a list of Observables that are each sending a web request to an API endpoint.
Observable<any>[] myWebRequests;
For now I execute them all at the same time:
combineLatest(myWebRequests).subscribe(result =>
this.messageService.add({
severity: "success",
summary: this.translateService.instant("StoryVersionEdit.PageTitle"),
detail: this.translateService.instant("StoryVersionEdit.StoryVersionSaved"),
closable: false
}),
error => {
console.error(error);
});
But when the list is large (e.g. 80 web requests), the API server gets flooded by all the requests at the same time and some end up in error.
I would like to batch this large list of requests and send them 10 at a time, wait for these to complete before sending the next 10.
How can I do that?
Edit 1: Here is how the myWebRequests is constructed:
...
Observable<any>[] myWebRequests = updateAllStoryVersionsBy(null, 'test', 'test', 'test');
...
updateAllStoryVersionsBy(
assetHoldingNumber: string,
assetHoldingLocation: string,
originalValue_LegacyDataFormatSupport: string,
originalValue_AssetHoldingEstablishment: string
): Observable<StoryVersion>[] {
var storyVersionObservables: Observable<StoryVersion>[] = [];
// Validations to only call the web server when relevant
if (!this.storyVersion.assetHoldingNumber && !this.storyVersion.assetHoldingLocation)
return storyVersionObservables;
const changedProperties = this.getChangedProperties();
if (changedProperties.indexOf('assetHoldingNumber') < 0 &&
changedProperties.indexOf('assetHoldingLocation') < 0 &&
changedProperties.indexOf('assetHoldingEstablishment') < 0 &&
changedProperties.indexOf('assetHoldingRoom') < 0 &&
changedProperties.indexOf('assetHoldingNotes') < 0 &&
changedProperties.indexOf('recordingType') < 0 &&
changedProperties.indexOf('legacyItemHoldingType') < 0 &&
changedProperties.indexOf('legacyDataFormatSupport') < 0 &&
changedProperties.indexOf('cbcVideoResolution') < 0
)
return storyVersionObservables;
// Find story versions to update
var allStoryVersionsToUpdate: Observable<IGuid[]>;
if (assetHoldingNumber) {
allStoryVersionsToUpdate = this.storyVersionService.findStoryVersionsByAssetHoldingNumber(assetHoldingNumber);
}
else if (assetHoldingLocation) {
var queryParameters: StoryVersionGetByAssetHoldingRequest = {
isUniqueOnly: false,
assetHoldingLocation: assetHoldingLocation,
assetHoldingEstablishment: originalValue_AssetHoldingEstablishment,
legacyItemHoldingType: originalValue_LegacyDataFormatSupport,
assetHoldingNumber: null,
responsible: null,
select: null,
offset: 0,
limit: 100
};
allStoryVersionsToUpdate = this.storyVersionService.findStoryVersionsByAssetHoldingLocation(queryParameters);
}
storyVersionObservables = <Observable<StoryVersion>[]><unknown>allStoryVersionsToUpdate
.pipe(
concatMap(data => {
const items$ = data.map(g => {
var svToUpdate: any = {
guid: g.guid
};
// Only assign fields that were edited by the user
if (changedProperties.indexOf('assetHoldingNumber') >= 0)
svToUpdate.assetHoldingNumber = this.storyVersionEditForm.value.assetHoldingNumber;
if (changedProperties.indexOf('assetHoldingLocation') >= 0)
svToUpdate.assetHoldingLocation = this.storyVersionEditForm.value.assetHoldingLocation;
if (changedProperties.indexOf('assetHoldingEstablishment') >= 0)
svToUpdate.assetHoldingEstablishment = this.storyVersionEditForm.value.assetHoldingEstablishment;
if (changedProperties.indexOf('assetHoldingRoom') >= 0)
svToUpdate.assetHoldingRoom = this.storyVersionEditForm.value.assetHoldingRoom;
if (changedProperties.indexOf('assetHoldingNotes') >= 0)
svToUpdate.assetHoldingNotes = this.storyVersionEditForm.value.assetHoldingNotes;
if (changedProperties.indexOf('recordingType') >= 0)
svToUpdate.recordingType = this.storyVersionEditForm.value.recordingType;
if (changedProperties.indexOf('legacyItemHoldingType') >= 0)
svToUpdate.legacyItemHoldingType = this.storyVersionEditForm.value.legacyItemHoldingType;
if (changedProperties.indexOf('legacyDataFormatSupport') >= 0)
svToUpdate.legacyDataFormatSupport = this.storyVersionEditForm.value.legacyDataFormatSupport;
if (changedProperties.indexOf('cbcVideoResolution') >= 0)
svToUpdate.cbcVideoResolution = this.storyVersionEditForm.value.cbcVideoResolution;
// The actual web request to fire for each!
return this.storyVersionService.updateStoryVersion(svToUpdate);
});
return forkJoin(...items$);
}),
)
return storyVersionObservables;
}
You can use mergeMap
and provide the concurrency argument:
from(myWebRequests).pipe(
mergeMap(req$ => req$, 5) // <--- limit to 5 concurrent requests
).subscribe(
result => this.messageService.add(...),
error => console.error(error)
);