I have a stream and I need to convert it to a generator, so an uploader can consume the generic generator.
This means turning:
stream.on('data', chunk => ...);
to:
generator = streamGenerator(stream);
chunk = await generator.next()
...
better yet:
chunk = yield streamGenerator;
Overall my best attempt requires leaking the resolve from a promise and I'd like to avoid that:
function streamToIterable(chunkSize, stream) {
let collector = [];
let value = [];
let done = false;
let _resolve;
let promise = new Promise(resolve => _resolve = resolve);
stream.on('data', chunk => {
collector = collector.concat(chunk);
if (value.length >= chunkSize) {
value = collector.splice(0, chunkSize);
_resolve(value);
stream.pause();
}
});
stream.on('end', () => {
_resolve(collection);
// With done set to true, the next iteration well ignore 'value' and end the loop
done = true;
});
stream.resume();
return {
next: () => ({
value: promise.then(() => {
stream.resume();
promise = new Promise(resolve => _resolve = resolve);
}),
done,
}),
};
}
function* streamToGenerator(stream) {
const iterator = streamToIterable(stream);
let next = iterator.next();
while (!next.done) {
yield next.value;
}
};
Usage in a generator for uploading chunks:
for (const chunkData of generator()) {
let result = yield uploadPost(url, formData, onChunkProgress(chunkIndex));
This is in a redux-saga, so "next()" isn't called on the generator until the return promise is resolved.
You cannot avoid storing the resolve
function in a mutable variable if you want to use a single event listener that resolves different promises. You could simplify the promise creation by using the once
method similar to the following:
function streamToIterator(stream) {
let done = false;
const end = new Promise(resolve => {
stream.once('end', resolve);
}).then(e => {
done = true;
});
return {
[Symbol.iterator]() { return this; }
next() {
const promise = new Promise(resolve => {
stream.once('data', value => {
resolve(value);
stream.pause();
});
stream.resume();
});
return {
value: Promise.race([promise, end]),
done,
};
}),
};
}
Of course, you are doing the racing between end
and data
yourself, you resume the stream before next
is called the first time and most importantly you do the chunking yourself, so this might to be applicable to your situation.
Apart from that, I'd recommend to check out the buffering internals of node.js streams, it might be easier to read chunks of certain sizes using a lower-level API than data
events.
Also you definitely should have a look at the asynchronous iteration proposal for es-next. The iterable interface you're trying to implement is very similar, and surely they either already have or really would welcome an example of making a node readablestream iterable.