I'm trying to use the neat syntax of async generator with babel (I'm stuck with node 8) and I'm wondering how you would convert an event emitter to an async generator cleanly
What I got so far look like this
const { EventEmitter } = require('events')
// defer fonction for resolving promises out of scope
const Defer = () => {
let resolve
let reject
let promise = new Promise((a, b) => {
resolve = a
reject = b
})
return {
promise,
reject,
resolve
}
}
// my iterator function
function readEvents(emitter, channel) {
const buffer = [Defer()]
let subId = 0
emitter.on(channel, x => {
const promise = buffer[subId]
subId++
buffer.push(Defer())
promise.resolve(x)
})
const gen = async function*() {
while (true) {
const val = await buffer[0].promise
buffer.shift()
subId--
yield val
}
}
return gen()
}
async function main () {
const emitter = new EventEmitter()
const iterator = readEvents(emitter, 'data')
// this part generates events
let i = 0
setInterval(() => {
emitter.emit('data', i++)
}, 1000)
// this part reads events
for await (let val of iterator) {
console.log(val)
}
}
main()
This is unweildy - can it be simplified?
I came up with this:
async *stream<TRecord extends object=Record<string,any>>(query: SqlFrag): AsyncGenerator<TRecord> {
const sql = query.toSqlString();
let results: TRecord[] = [];
let resolve: () => void;
let promise = new Promise(r => resolve = r);
let done = false;
this.pool.query(sql)
.on('error', err => {
throw err;
})
.on('result', row => {
results.push(row);
resolve();
promise = new Promise(r => resolve = r);
})
.on('end', () => {
done = true;
})
while(!done) {
await promise;
yield* results;
results = [];
}
}
Seems to be working so far.
i.e. you create a dummy promise like in Khanh's solution so that you can wait for the first result, but then because many results might come in all at once, you push them into an array and reset the promise to wait for the result (or batch of results). It doesn't matter if this promise gets overwritten dozens of times before its ever awaited.
Then we can yield all the results at once with yield*
and flush the array for the next batch.