Search code examples
javascriptstreampromisemocha.jsbacon.js

Bacon zipped Stream not ending when matches complete causing test suite to hang


I have been trying to use Bacon's zip...() operations to create a Mocha test-suite, where a sequence of values from actual asynchronous networked events could be compared to a sequence of target values to verify correct behaviour.

However, the zipped stream in the test case never terminates.

I have a successful Bacon test case where two streams are 'faked' by creating both streams from arrays. This test case successfully terminates, with toPromise() for the stream returning without error. https://gist.github.com/cefn/7fd3504a0b29575f3b6f

However, in the case where one of the streams is actually wired to the asynchronous networked events, the toPromise() call never completes and the test suite hangs, even though I can tell through the V8 debugger that the final array entry in the target sequence is indeed matched.

I don't understand what causes the streams to end in one case and not end in the other. Is there a configuration change that I can make, or an explicit declaration of the number of events in the stream, or something else, to force the stream end event to be fired from the zipped stream.

            it("Sends branch keys then leaf values", function(){

                var sniffer = mqtt.connect(conf.mqttWsAddr);

                var subscribeAndSendPromise = util.createSubscriptionPromise(sniffer, '#')
                    .then(function(){
                        writer.setItem('oldmacdonald/farm', {
                            sheep:"Baa",
                            pig:"Oink",
                            cow:"Moo",
                            duck:"Quack",
                        });
                    });

                var actualStream = Bacon.fromEvent(sniffer, "message", function(topic,payload,packet){
                    packet.message = payload.toString();
                    return packet;
                });

                var targetStream = Bacon.sequentially(0,[
                    ["the_workshop/oldmacdonald", "[]" ] , //originally empty
                    // ["the_workshop/oldmacdonald", JSON.stringify(['farm']) , // TODO somehow missing
                    // ["the_workshop/oldmacdonald/farm", JSON.stringify([]) ] , // TODO somehow missing
                    ["the_workshop/oldmacdonald/farm", JSON.stringify(['sheep','pig','cow','duck']) ] , //n.b. JSON array
                    ["the_workshop/oldmacdonald/farm" + "/sheep", JSON.stringify("Baa")] , //n.b. quoted JSON strings
                    ["the_workshop/oldmacdonald/farm" + "/pig", JSON.stringify("Oink")] ,
                    ["the_workshop/oldmacdonald/farm" + "/cow", JSON.stringify("Moo")] ,
                    ["the_workshop/oldmacdonald/farm" + "/duck", JSON.stringify("Quack")] ,
                ]);

                var deepEqual = function(targetValues, actualPacket){
                    var actualTopic = actualPacket.topic;
                    var actualMessage = actualPacket.message;
                    var targetTopic = targetValues[0];
                    var targetMessage = targetValues[1];
                    assert(actualTopic == targetTopic);
                    assert(actualMessage == targetMessage);
                    return true;
                };

                var zippedStream = Bacon.zipAsArray(targetStream, actualStream);
                zippedStream.onValues(deepEqual);

                var receivePromise = zippedStream.toPromise();
                receivePromise.finally(function(){
                    sniffer.end();
                });

                return Q.all([subscribeAndSendPromise, receivePromise]);

            })


Solution

  • The problem is that the actualStream never ends: it is created with fromEvent and listens to the EventEmitter forever.

    The zippedStream only ends when both targetStream and actualStream have ended.

    You could add a timeout to the actualStream with:

    var actualStreamWithTimeout = actualStream.takeUntil(Bacon.later(1000))