Search code examples
node.jsstreames6-promisechild-processasynchronous-javascript

How implement back pressure manually


i have child process in that i am piping stream to parent.

in child.js

  let stream = readdirp(pathname);
  stream.pipe.pipe(process.stdout);   

in parent.js

let file = child => {
  let estream = es.map((data, next) => {
    _this.s3MultiUpload(JSON.parse(data), data, next);
    //i uploding this files to s3.
  });
  child.on("end", (code, signal) => {
    console.log("stream ended"); // `here is my problem`
    child.kill();
  });
  child.on("exit", (code, signal) => {
    console.log(code);
    console.log(signal);
    child.kill();
  });
  return estream;
};
child = fork(filePath, { silent: true });
child.stdout.pipe(this.file(child));

My problem is before i upload all files to s3 stream got ended. i studied about backpressure, but i don't understand how to implement here ?

i think i need add callback or something to process stdout pipe. i don't know

Can you please help me


Solution

  • The approach is unecessarily complicated. Since, IO operations aren't CPU bound we are better off using Promises together with JavaScript's async/await and * syntax to perform the parallel file uploading. Building our own synchronization mechanisms is complex and there are many overlapping language and library level concepts that arise1.

    Based on the readdirp documentation, but noting my unfamiliarity with the specific upload API, I'd suggest something along these lines

    const readdirp = require('readdirp');
    const util = require('util');
    const fs = require('fs');
    
    const readfile = util.promisify(fs.readfile);
    
    (async function () {
      // Use streams to achieve small RAM & CPU footprint.
      // 1) Streams example with for-await. Node.js 10+ only.
      const paths = [];
      for await (const {path} of readdirp('pending-uploads')) {
        paths.push(path);
      }
    
      const uploadPromises = paths
        .map(readFile)
        .map(JSON.parse).
        .map(data => s3MultiUpload(data));
    
      await Promise.all(uploadPromises);
    }());
    

    1.Back pressure is one of these concepts arising from the process of porting the Reactive Extensions library to the JVM implementing it in Java. Just for the sake of argument(sanity?) consider what Erik Meijer says regarding backpressure.