Search code examples
node.jsmongodbnode.js-stream

mongodb stream query with through2.spy end is not called


I have a streaming query from mongo and I am piping it to a through2 "spy" writeable stream. It works entirely including the "end" callback with a small collection of 5 documents. However, with a larger collection of 344 documents, only the first 15 come through, then it hangs forever, and the "end" event never fires. Here's a MCVE:

var spy = require("through2-spy").obj;
var MongoClient = require("mongodb").MongoClient;

function getStream() {
  var stream = spy(function() {
    console.log("@bug counting", stream.total++);
  });
  stream.total = 0;
  return stream;
}

function onEnd() {
  console.log("ended");
}

MongoClient.connect(process.argv[2], function(error, db) {
  if (error) {
    console.error(error);
    return;
  }
  var stream = db.collection(process.argv[3]).find().stream();
  stream
    // behavior is the same with the follow line commented out or not
    .on("end", db.close.bind(db))
    .on("error", console.error)
    .on("end", onEnd)
    .pipe(getStream());
});

Solution

  • The problem's with through2-spy defaulting to using a highWaterMark of 16. To handle flow control, streams maintain an internal buffer that is cleared when data is consumed from them. Because there's no readable stream consuming data from the transform stream returned by getStream, the internal buffer gets filled and reaches the highWaterMark. Increasing the highWaterMark should fix it:

    var stream = spy({highWaterMark: 350}, function() {
      console.log("@bug counting", stream.total++);
    });
    

    Another non-standard alternative is to reset the transform stream's readable state:

    var stream = spy(function() {
        console.log("@bug counting", stream.total++);
        this._readableState.length = 0;
    });