Search code examples
node.jspiperace-conditionfile-descriptorchild-process

Data sometimes not passed to second child process when using the stdout of a child process as stdin of another


When using the stdout of one child process as stdin for another, it seems that sometimes data is not passed to the next child:

var spawn = require('child_process').spawn;
var pipeId = 0;

var launchProcess = function(cmd, args, stdin){
  return spawn(cmd, args, {
    stdio: [stdin ? stdin : 'ignore', 'pipe', 'pipe']
  });
};

var launch = function(){
  var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger']);
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);

  pipeId++;

  task1.on('exit', launch);
};

launch();

Some files are empty:

ls -lhS /tmp/body-pipeline-*

I also tried passing the file descriptor as a positive integer by accessing task0.stdout._handle.fd and the problem persists.

To my knowledge this is how shell pipes work: the same file descriptor for the stdout of one process is used as stdin of another. I'm trying to avoid passing all data through the NodeJS process because it causes high CPU loads when the child processes output a lot of data.

Update: When pipes are used for both stdin and stdout everything works as expected (using cat here to test with longer text):

var spawn = require('child_process').spawn;
var pipeId = 0;

var launchProcess = function(cmd, args, stdin){
  return spawn(cmd, args, {
    stdio: [stdin ? stdin : 'pipe', 'pipe', 'pipe']
  });
};

var launch = function(){
  var task0 = launchProcess('cat');
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId]);

  task0.stdout.pipe(task1.stdin)

  task0.stdin.write(JSON.stringify(process.env).split(',').join('\n'))
  task0.stdin.end();

  pipeId++;

  task1.on('exit', launch);
};

launch();

Update2: When using task0.stdout.pipe(task1.stdin) the script uses 50% CPU (compared to 0% when passing stdout of task0 as stdin of task1):

var spawn = require('child_process').spawn;
var pipeId = 0;

var launchProcess = function(cmd, args, stdin, stdout, stderr){
  return spawn(cmd, args, {
    stdio: [stdin, stdout, stderr]
  });
};

var launch = function(){
  var task0 = launchProcess('yes', ['lala'], 'ignore', 'pipe', 'ignore');
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore', 'ignore');
  // var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore', 'ignore');


  task0.stdout.pipe(task1.stdin);

  pipeId++;

  task1.on('exit', launch);
};

launch();

Update3: This better illustrates my problem. I tried to simplify it in the original code but I think it was too simplified. Larry Turtis offered a workaround for the simplified case but that doesn't apply to mine:

var spawn = require('child_process').spawn;

var pipeId = 0;
var pipeSlots = 6;

var launchProcess = function(cmd, args, stdin, stdout){
  return spawn(cmd, args, {
    stdio: [stdin, stdout, 'ignore']
  });
};

var launch = function(){
  var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger'], 'ignore', 'pipe');
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore');

  task0.on('error', function(err){
    console.log('Error while processing task0:' + err.stack);
  });
  task1.on('error', function(err){
    console.log('Error while processing task1:' + err.stack);
  });

  pipeId++;
};

// Simulating message queue
setInterval(function(){
  // Simulating how many messages we get from the messaging queue
  var mqMessageCount = Math.floor(Math.random() * (pipeSlots + 1));

  for(var i = 0; i < mqMessageCount; i++){
    launch();
  }
}, 250); // For this test we assume that pipes finish under 250ms

Solution

  • This is now a known NodeJS issue: https://github.com/nodejs/node/issues/9413

    TLDR; A colleague of mine had a great idea that fixed this:

    var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore');
    var task0 = launchProcess('echo', ['how\nare\nyou\ndear\nstranger'], 'ignore', task1.stdin);
    

    The idea is to launch the receiving task before launching the sending task !