Search code examples
javascripttypescriptrxjsobservable

How to execute a large list of Observables (web requests) in smaller batches?


I have a list of Observables that are each sending a web request to an API endpoint.

Observable<any>[] myWebRequests;

For now I execute them all at the same time:

combineLatest(myWebRequests).subscribe(result =>
    this.messageService.add({
        severity: "success",
        summary: this.translateService.instant("StoryVersionEdit.PageTitle"),
        detail: this.translateService.instant("StoryVersionEdit.StoryVersionSaved"),
        closable: false
    }),
    error => {
        console.error(error);
    });

But when the list is large (e.g. 80 web requests), the API server gets flooded by all the requests at the same time and some end up in error.

I would like to batch this large list of requests and send them 10 at a time, wait for these to complete before sending the next 10.

How can I do that?


Edit 1: Here is how the myWebRequests is constructed:

...
Observable<any>[] myWebRequests = updateAllStoryVersionsBy(null, 'test', 'test', 'test');
...

updateAllStoryVersionsBy(
    assetHoldingNumber: string,
    assetHoldingLocation: string,
    originalValue_LegacyDataFormatSupport: string,
    originalValue_AssetHoldingEstablishment: string
): Observable<StoryVersion>[] {
    var storyVersionObservables: Observable<StoryVersion>[] = [];

    // Validations to only call the web server when relevant
    if (!this.storyVersion.assetHoldingNumber && !this.storyVersion.assetHoldingLocation)
        return storyVersionObservables;

    const changedProperties = this.getChangedProperties();

    if (changedProperties.indexOf('assetHoldingNumber') < 0 &&
        changedProperties.indexOf('assetHoldingLocation') < 0 &&
        changedProperties.indexOf('assetHoldingEstablishment') < 0 &&
        changedProperties.indexOf('assetHoldingRoom') < 0 &&
        changedProperties.indexOf('assetHoldingNotes') < 0 &&
        changedProperties.indexOf('recordingType') < 0 &&
        changedProperties.indexOf('legacyItemHoldingType') < 0 &&
        changedProperties.indexOf('legacyDataFormatSupport') < 0 &&
        changedProperties.indexOf('cbcVideoResolution') < 0
    )
        return storyVersionObservables;

    // Find story versions to update
    var allStoryVersionsToUpdate: Observable<IGuid[]>;
    if (assetHoldingNumber) {
        allStoryVersionsToUpdate = this.storyVersionService.findStoryVersionsByAssetHoldingNumber(assetHoldingNumber);
    }
    else if (assetHoldingLocation) {
        var queryParameters: StoryVersionGetByAssetHoldingRequest = {
            isUniqueOnly: false,
            assetHoldingLocation: assetHoldingLocation,
            assetHoldingEstablishment: originalValue_AssetHoldingEstablishment,
            legacyItemHoldingType: originalValue_LegacyDataFormatSupport,
            assetHoldingNumber: null,
            responsible: null,
            select: null,
            offset: 0,
            limit: 100
        };
        allStoryVersionsToUpdate = this.storyVersionService.findStoryVersionsByAssetHoldingLocation(queryParameters);
    }

    storyVersionObservables = <Observable<StoryVersion>[]><unknown>allStoryVersionsToUpdate
        .pipe(
            concatMap(data => {
                const items$ = data.map(g => {
                    var svToUpdate: any = {
                        guid: g.guid
                    };

                    // Only assign fields that were edited by the user

                    if (changedProperties.indexOf('assetHoldingNumber') >= 0)
                        svToUpdate.assetHoldingNumber = this.storyVersionEditForm.value.assetHoldingNumber;

                    if (changedProperties.indexOf('assetHoldingLocation') >= 0)
                        svToUpdate.assetHoldingLocation = this.storyVersionEditForm.value.assetHoldingLocation;

                    if (changedProperties.indexOf('assetHoldingEstablishment') >= 0)
                        svToUpdate.assetHoldingEstablishment = this.storyVersionEditForm.value.assetHoldingEstablishment;

                    if (changedProperties.indexOf('assetHoldingRoom') >= 0)
                        svToUpdate.assetHoldingRoom = this.storyVersionEditForm.value.assetHoldingRoom;

                    if (changedProperties.indexOf('assetHoldingNotes') >= 0)
                        svToUpdate.assetHoldingNotes = this.storyVersionEditForm.value.assetHoldingNotes;

                    if (changedProperties.indexOf('recordingType') >= 0)
                        svToUpdate.recordingType = this.storyVersionEditForm.value.recordingType;

                    if (changedProperties.indexOf('legacyItemHoldingType') >= 0)
                        svToUpdate.legacyItemHoldingType = this.storyVersionEditForm.value.legacyItemHoldingType;

                    if (changedProperties.indexOf('legacyDataFormatSupport') >= 0)
                        svToUpdate.legacyDataFormatSupport = this.storyVersionEditForm.value.legacyDataFormatSupport;

                    if (changedProperties.indexOf('cbcVideoResolution') >= 0)
                        svToUpdate.cbcVideoResolution = this.storyVersionEditForm.value.cbcVideoResolution;

                    // The actual web request to fire for each!
                    return this.storyVersionService.updateStoryVersion(svToUpdate);
                });
                return forkJoin(...items$);
            }),
        )

    return storyVersionObservables;
}

Solution

  • You can use mergeMap and provide the concurrency argument:

    from(myWebRequests).pipe(
        mergeMap(req$ => req$, 5) // <--- limit to 5 concurrent requests
    ).subscribe(
        result => this.messageService.add(...),
        error  => console.error(error)
    );