I want to create a queue which when filled up executes concurrently upto, say 3, items at most asynchronously.
I currently have the concurrency part sorted, but how do I an observable queue into which I can push observables and subscribe to the output.
Here's the code for static concurrency. This only lets 3 promises resolve at most at any time.
import { defer, from } from 'rxjs';
import { mergeAll } from 'rxjs/operators';
async function getData(x: string) { // Returns original value after 1s
return new Promise((resolve) => {
setTimeout(() => resolve(x), 1000);
});
}
const ids = [...Array(20).keys()]; // 1,2,3, ... 18,19,20
const observables = ids.map((x) => defer(() => getData('John ' + x)));
from(observables)
.pipe(mergeAll(3))
.subscribe((d) => {
data.push({
name: d as string
});
});
let data = [
{
name: 'Jon start'
}
];
Thanks to @ruth & @maxime1992 in the replies, I was able to arrive on the solution:
async function getData(x: string) { // Returns original value after 1s
return new Promise((resolve) => {
setTimeout(() => resolve(x), 1000);
});
}
const queue$: Subject<Observable<string | undefined>> = new Subject();
// terminate$ is used as a stop signal
const terminate$ = new Subject();
// 3 is concurrency limit
const throttledQueue$ = queue$.pipe(mergeAll(3));
throttledQueue$
.pipe(takeUntil(terminate$))
.subscribe((data) => console.log('Combined Stream Output:', data));
// To insert something into the queue:
queue$.next(defer(() => getData('Jon Don'));
// To unsubscribe and stop processing:
terminate$.next(null);