Search code examples
rxjsobservable

Create a concurrency limited queue using Rx.js


I want to create a queue which when filled up executes concurrently upto, say 3, items at most asynchronously.

I currently have the concurrency part sorted, but how do I an observable queue into which I can push observables and subscribe to the output.

Here's the code for static concurrency. This only lets 3 promises resolve at most at any time.

    import { defer, from } from 'rxjs';
    import { mergeAll } from 'rxjs/operators';

    async function getData(x: string) { // Returns original value after 1s
        return new Promise((resolve) => {
            setTimeout(() => resolve(x), 1000);
        });
    }

    const ids = [...Array(20).keys()]; // 1,2,3, ... 18,19,20

    const observables = ids.map((x) => defer(() => getData('John ' + x)));

    from(observables)
        .pipe(mergeAll(3))
        .subscribe((d) => {
            data.push({
                name: d as string
            });
        });

    let data = [
        {
            name: 'Jon start'
        }
    ];


Solution

  • Thanks to @ruth & @maxime1992 in the replies, I was able to arrive on the solution:

        async function getData(x: string) { // Returns original value after 1s
            return new Promise((resolve) => {
                setTimeout(() => resolve(x), 1000);
            });
        }
    
        const queue$: Subject<Observable<string | undefined>> = new Subject();
        // terminate$ is used as a stop signal
        const terminate$ = new Subject();
    
        // 3 is concurrency limit
        const throttledQueue$ = queue$.pipe(mergeAll(3));
    
        throttledQueue$
            .pipe(takeUntil(terminate$))
            .subscribe((data) => console.log('Combined Stream Output:', data));
    
        // To insert something into the queue:
        queue$.next(defer(() => getData('Jon Don'));
    
        // To unsubscribe and stop processing:
        terminate$.next(null);