Search code examples
node.jsevent-streamjsonstream

node heap exhausted when piping JSONStream.parsed() data through es.map() and JSONStream.stringify() to file stream


I'm trying to pipe an input stream (created from a huge GeoJSON file) through JSONStream.parse() to break the stream into objects, then through event-stream.map() to allow me to transform the object, then through JSONStream.stringify() to create a string out of it, and finally to a writable output stream. As the process runs, I can see node's memory footprint continue to grow until it eventually exhausts heap. Here's the simplest script (test.js) that recreates the problem:

const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")

out = fs.createWriteStream("/dev/null")
process.stdin
    .pipe(js.parse("features.*"))
    .pipe(es.map( function(data, cb) { 
        cb(null, data);
        return;
    } ))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out)

A little bash script (barf.sh) that spews an endless stream of JSON into node's process.stdin will cause node's heap to gradually grow:

#!/bin/bash

echo '{"type":"FeatureCollection","features":['
while :
do
    echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done

by running it as so:

barf.sh | node test.js

There are a couple of curious ways to sidestep the issue:

  • Remove the fs.createWriteStream() and change the last pipe stage from ".pipe(out)" to ".pipe(process.stdout)" and then pipe node's stdout to /dev/null
  • Change the asynchronous es.map() to the synchronous es.mapSync()

Either one of the preceding two actions will allow the script to run forever, with node's memory footprint low and unchanging. I'm using node v6.3.1, event-stream v3.3.4, and JSONStream 1.1.4 on an eight core machine with 8GB of RAM running Ubuntu 16.04.

I hope someone can help me correct what I'm sure is an obvious error on my part.


Solution

  • JSONStream is not a streams2 stream, so it does not support backpressure control. (There is a brief summary about streams2 here.)

    That means the data is going to come out of the parse stream in data events and that the stream is going to keep pumping them out regardless of whether the consuming stream is ready for them. If there is some discrepancy somewhere in the pipeline between how fast something can be read and written, there will be buffering - which is what you are seeing.

    Your barf.sh harness sees features pumped in via stdin. If, instead, you were reading a massive file, you should be able to manage the flow by pausing the file's read stream. So if you were to insert some pause/resume logic into your map callback, you should be able to get it to process a massive file; it would just take a little longer. I'd experiment with something like this:

    let in = fs.createReadStream("/some/massive/file");
    let out = fs.createWriteStream("/dev/null");
    in
        .pipe(js.parse("features.*"))
        .pipe(es.map(function(data, cb) {
            // This is just an example; a 10-millisecond wait per feature would be very slow.
            if (!in.isPaused()) {
                in.pause();
                global.setTimeout(function () { in.resume(); }, 10);
            }
            cb(null, data);
            return;
        }))
        .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
        .pipe(out);
    

    Incidentally, using mapSync makes little-to-no difference on my computer (which is old and slow). However, unless you have some asynchronous operation to perform in map, I'd go with mapSync.