Is there a pattern for making a stream iterable using ES6 generators?
See 'MakeStreamIterable' below.
import {createReadStream} from 'fs'
let fileName = 'largeFile.txt'
let readStream = createReadStream(fileName, {
encoding: 'utf8',
bufferSize: 1024
})
let myIterableAsyncStream = MakeStreamIterable(readStream)
for (let data of myIterableAsyncStream) {
let str = data.toString('utf8')
console.log(str)
}
I'm not interested in co or bluebird's coroutine or blocking with deasync.
The gold is MakeStreamIterable should be a valid function.
Is there a pattern for making a stream iterable using ES6 generators?
No, this cannot be achieved because generators are synchronous. They must know what they are yielding and when. Iteration of an asynchronous data source can only currently be achieved by using some kind of callback-based implementation. So, there is no way to make MakeStreamIterable
'a valid function' if what you mean by this is 'a valid function whose result can be given to a for-of
loop'.
A stream represents a potentially infinite amount of data received asynchronously over a potentially infinite amount of time. If we take a look at the definition of an iterator on MDN we can define in more detail what it is about a stream that makes it 'uniterable':
An object is an iterator when it knows how to access items from a collection one at a time, while keeping track of its current position within that sequence. In JavaScript an iterator is an object that provides a next() method which returns the next item in the sequence. This method returns an object with two properties:
done
andvalue
.
(Emphasis is my own.)
Let's pick out the properties of an iterable from this definition. The object must...
next
, that retrieves an object with a property that holds the next value
in the sequence or notifies that iteration is done
.A stream doesn't meet any of the above criteria because...
next
method which a for-of
can utilise.______
We can't actually iterate the data received from a stream (definitely not using a for-of
), however we can build an interface that pretends to by using Promises (yay!) and abstracting away the stream's event handlers inside a closure.
// MakeStreamIterable.js
export default function MakeStreamIterable (stream) {
let collection = []
let index = 0
let callback
let resolve, reject
stream
.on('error', err => reject && reject(err))
.on('end', () => resolve && resolve(collection))
.on('data', data => {
collection.push(data)
try {
callback && callback(data, index++)
} catch (err) {
this.end()
reject(err)
}
})
function each (cb) {
if(callback) {
return promise
}
callback = (typeof cb === 'function') ? cb : null
if (callback && !!collection) {
collection.forEach(callback)
index = collection.length
}
return promise
}
promise = new Promise((res, rej) => {
resolve = res
reject = rej
})
promise.each = each
return promise
}
And we can use it like this:
import {MakeStreamIterable} from './MakeStreamIterable'
let myIterableAsyncStream = MakeStreamIterable(readStream)
myIterableAsyncStream
.each((data, i) => {
let str = data.toString('utf8')
console.log(i, str)
})
.then(() => console.log('completed'))
.catch(err => console.log(err))
Things to note about this implementation:
each
immediately on the 'iterable stream'.each
is called, all values received prior to its call are passed to the callback one-by-one forEach
-style. Afterwards all subsequent data are passed immediately to the callback.collection
of data when the stream ends, meaning we actually don't have to call each
at all if the method of iteration provided by each
isn't satisfactory.