I'm implementing a query engine that mass fetches and processes requests. I am using async/await.
Right now the flow of execution runs in a hierarchy where there is a list of items containing queries, and each of those queries have a fetch.
What I am trying to do is bundle the items in groups of n, so even if each of them have m queries with fetches inside, only n*m requests run simultaneously; and specially only one request will be made simultaneously to the same domain.
The problem is, when I await the execution of the items (at the outer level, in a while that groups items and will stop iterations until the promises resolve), those promises are resolving when the execution of an inner query is deferred because of the inner await of the fetch.
That causes my queuing while to only stop momentarily, instead of awaiting for the inner promises to resolve to.
This is the outer, queuing class:
class AsyncItemQueue {
constructor(items, concurrency) {
this.items = items;
this.concurrency = concurrency;
}
run = async () => {
let itemPromises = [];
const bundles = Math.ceil(this.items.length / this.concurrency);
let currentBundle = 0;
while (currentBundle < bundles) {
console.log(`<--------- FETCHING ITEM BUNDLE ${currentBundle} OF ${bundles} --------->`);
const lowerRange = currentBundle * this.concurrency;
const upperRange = (currentBundle + 1) * this.concurrency;
itemPromises.push(
this.items.slice(lowerRange, upperRange).map(item => item.run())
);
await Promise.all(itemPromises);
currentBundle++;
}
};
}
export default AsyncItemQueue;
This is the simple item class that queue is running. I'm omitting superfluous code.
class Item {
// ...
run = async () => {
console.log('Item RUN', this, this.name);
return await Promise.all(this.queries.map(query => {
const itemPromise = query.run(this.name);
return itemPromise;
}));
}
}
And this is the queries contained inside items. Every item has a list of queries. Again, some code is removed as it's not interesting.
class Query {
// ...
run = async (item) => {
// Step 1: If requisites, await.
if (this.requires) {
await this.savedData[this.requires];
}
// Step 2: Resolve URL.
this.resolveUrl(item);
// Step 3: If provides, create promise in savedData.
const fetchPromise = this.fetch();
if (this.saveData) {
this.saveData.forEach(sd => (this.savedData[sd] = fetchPromise));
}
// Step 4: Fetch.
const document = await fetchPromise;
// ...
}
}
The while in AsyncItemQueue
is stopping correctly, but only until the execution flow reaches step 3 in Query
. As soon as it reaches that fetch, which is a wrapper for the standard fetch functions, the outer promise resolves, and I end up with all the requests being performed at the same time.
I suspect the problem is somewhere in the Query class, but I am stumped as to how to avoid the resolution of the outer promise.
I tried making the Query
class run
function return document, just in case, but to no avail.
Any idea or guidance would be greatly appreciated. I'll try to answer any questions about the code or provide more if needed.
Thanks!
PS: Here is a codesandbox with a working example: https://codesandbox.io/s/goofy-tesla-iwzem
As you can see in the console exit, the while loop is iterating before the fetches finalize, and they are all being performed at the same time.
I've solved it.
The problem was in the AsyncItemQueue
class. Specifically:
itemPromises.push(
this.items.slice(lowerRange, upperRange).map(item => item.run())
);
That was pushing a list of promises into the list, and so, later on:
await Promise.all(itemPromises);
Did not find any promises to wait in that list (because it contained more lists, with promises inside).
The solution was to change the code to:
await Promise.all(this.items.slice(lowerRange, upperRange).map(item => item.run()));
Now it is working perfectly. Items are being run in batches of n, and a new batch will not run until the previous has finished.
I'm not sure this will help anyone but me, but I'll leave it here in case somebody finds a similar problem someday. Thanks for the help.