I'm trying to understand how the following zip function (esp. the invoke function) can be made more functional. The issue I've got is that the invoke method has to wait for both the left and right side to be filled before it can dispatch the values. The values have to be called in order so that the correct values are zipped, otherwise I would consider a curry/partial function to fulfill this.
Is there anything that I could use that could remove this hinderance.
function zip(state, a, b) {
var left = [];
var right = [];
function invoke() {
if (left.length > 0 && right.length > 0) {
state([left.shift(), right.shift()]);
}
}
a.foreach(function(v) {
left.push(v);
invoke();
});
b.foreach(function(v) {
right.push(v);
invoke();
});
}
Bellow is a simple example of what satisfies the zip function.
function Stream() {
var env = this;
env.subs = [];
env.id = setInterval(function() {
env.subs.forEach(function(f) {
f(Math.random());
});
}, ((Math.random() * 100) + 500) | 0);
}
Stream.prototype.foreach = function(f) {
this.subs.push(f);
}
zip(function(v) {
console.log(v);
}, new Stream(), new Stream());
Bonus: Removing mutable array.
A more functional approach would be possible if the Stream
had some kind of iterator
interface, that divides the list in the first element and its successors (like Haskell lists are built, you seem to know them).
I know this code is more complex (and at least longer) at first, but using the structures gets more convenient:
function Promise(resolver) {
// you know better promise libs of course
// this one is not even monadic
var subs = [],
res = null;
resolver(function resolve() {
res = arguments;
while (subs.length) subs.shift().apply(null, res);
});
this.onData = function(f) {
if (res)
f.apply(null, res);
else
subs.push(f);
return this;
};
}
Promise.all = function() {
var ps = Array.prototype.concat.apply([], arguments);
return new Promise(function(resolve) {
var res = [],
l = ps.length;
ps.forEach(function(p, i) {
p.onData(function() {
while(res.length < arguments.length) res.push([]);
for (var j=0; j<arguments.length; j++)
res[j][i] = arguments[j];
if (--l == 0)
resolve.apply(null, res);
});
});
});
};
function Stream() {
// an asynchronous (random) list
var that = this,
interval = (Math.random() * 100 + 500) | 0;
this.first = new Promise(function create(resolve) {
that.id = setTimeout(function() {
resolve(Math.random(), new Promise(create));
}, interval);
});
}
// this is how to consume a stream:
Stream.prototype.forEach = function(f) {
this.first.onData(function fire(res, next) {
f(res);
next.onData(fire);
});
return this;
};
Stream.prototype.end = function() { clearTimeout(this.id); return this; };
But zipping them is easy now:
function zip() {
var res = Object.create(Stream.prototype); // inherit the Stream interface
res.first = (function create(firsts) {
return new Promise(function(resolve) {
Promise.all(firsts).onData(function(results, nexts) {
resolve(results, create(nexts));
});
});
})(Array.prototype.map.call(arguments, function(stream) {
return stream.first;
}));
return res;
}
zip(new Stream, new Stream).forEach(console.log.bind(console));
Basically I've generalized your waiting for the first items into the Promise pattern, where Promise.all
features parallel waiting, and your mutable arrays of results into nested lists of promises. And I've avoided code duplication (for left
and right
) by making all functions work with any number of arguments.