Search code examples
javascriptgenerator

How to convert a stream into a generator without leaking resolve from a promise


I have a stream and I need to convert it to a generator, so an uploader can consume the generic generator.

This means turning:

stream.on('data', chunk => ...);

to:

generator = streamGenerator(stream);
chunk = await generator.next()
...

better yet:

chunk = yield streamGenerator;

Overall my best attempt requires leaking the resolve from a promise and I'd like to avoid that:

function streamToIterable(chunkSize, stream) {
    let collector = [];
    let value = [];
    let done = false;
    let _resolve;
    let promise = new Promise(resolve => _resolve = resolve);
    stream.on('data', chunk => {
        collector = collector.concat(chunk);
        if (value.length >= chunkSize) {
            value = collector.splice(0, chunkSize);
            _resolve(value);
            stream.pause();
        }
    });
    stream.on('end', () => {
        _resolve(collection);

        // With done set to true, the next iteration well ignore 'value' and end the loop
        done = true;
    });
    stream.resume();

    return {
        next: () => ({
            value: promise.then(() => {
                stream.resume();
                promise = new Promise(resolve => _resolve = resolve);
            }),
            done,
        }),
    };
}

function* streamToGenerator(stream) {
    const iterator = streamToIterable(stream);
    let next = iterator.next();
    while (!next.done) {
        yield next.value;
    }
};

Usage in a generator for uploading chunks:

for (const chunkData of generator()) {
    let result = yield uploadPost(url, formData, onChunkProgress(chunkIndex));

This is in a redux-saga, so "next()" isn't called on the generator until the return promise is resolved.


Solution

  • You cannot avoid storing the resolve function in a mutable variable if you want to use a single event listener that resolves different promises. You could simplify the promise creation by using the once method similar to the following:

    function streamToIterator(stream) {
        let done = false;
        const end = new Promise(resolve => {
            stream.once('end', resolve);
        }).then(e => {
            done = true;
        });
    
        return {
            [Symbol.iterator]() { return this; }
            next() {
                const promise = new Promise(resolve => {
                    stream.once('data', value => {
                        resolve(value);
                        stream.pause();
                    });
                    stream.resume();
                });
    
                return {
                    value: Promise.race([promise, end]),
                    done,
                };
            }),
        };
    }
    

    Of course, you are doing the racing between end and data yourself, you resume the stream before next is called the first time and most importantly you do the chunking yourself, so this might to be applicable to your situation.

    Apart from that, I'd recommend to check out the buffering internals of node.js streams, it might be easier to read chunks of certain sizes using a lower-level API than data events.

    Also you definitely should have a look at the asynchronous iteration proposal for es-next. The iterable interface you're trying to implement is very similar, and surely they either already have or really would welcome an example of making a node readablestream iterable.