Search code examples
node.jstypescriptfastifynode-streams

Control nodejs stream data through multiple streams


I am building a fastify server to upload/download files to s3, when I want to download a file I am able to get it using a node stream and return it, so I prevent loading the whole file in memory, I would like to do something like:

  • get the file web stream from the bucket
  • scan it using clamAV, streaming data into the AV return the file stream to client only if the file is not infected

I have tried with piplines but I need to to check when the scan is done, and after I send the stream from s3 to the AV, the original stream is consumed, hence, I cannot send it to the client. do you guys have any suggestion or any gotcha here?

code I have right now:

const stream = body.transformToWebStream();

const antivirusPassthrough = app.avClient.passthrough();
const downloadPassthrough = new PassThrough();


antivirusPassthrough.once("error", (err) => {
  throw err
});
antivirusPassthrough.once("scan-complete", (result) => {
  const { isInfected } = result;
  if (isInfected) {
    throw new Errpr("File is infected");
  }
});


pipeline(
  stream,
  antivirusPassthrough,
  downloadPassthrough,
  (err) => {
    if (err) {
      app.log.error(err);
    }
  }
);
return reply.send(downloadPassthrough);


Solution

  • To achieve your goal you need to find a thread off to save resources.

    You may use a PromiseTransform class in your Fastify server. This class allows you to wait for a promise (like your virus scan) to resolve before completing the stream.

    Below is a demo of how to implement this using a random promise to simulate the virus scan:

    const fs = require('fs');
    const { Transform, PassThrough } = require('stream');
    const { pipeline } = require('stream/promises');
    
    // The class to wait for the promise to resolve before finishing the stream
    class PromiseTransform extends Transform {
      constructor (aPromise, options = {}) {
        super(options);
        this.aPromise = aPromise;
      }
    
      _transform (chunk, encoding, callback) {
        this.push(chunk);
        callback();
      }
    
      _flush (callback) {
        // https://nodejs.org/api/stream.html#transform_flushcallback
        this.aPromise //
          .then(callback)
          .catch(callback);
      }
    }
    
    const app = require('fastify')({ logger: true });
    app.get('/', async (request, reply) => {
      // The S3 object is a stream
      const sourceFile = fs.createReadStream(__filename);
    
      // This is a random promise to simulate the virus scan
      const slowRandomPromise = new Promise((resolve, reject) => {
        setTimeout(() => {
          if (Math.random() < 0.5) {
            console.log('random promise failed');
            reject(new Error('bad luck'));
            return;
          }
          console.log('random promise success');
          resolve();
        }, 1_500);
      });
      const promiseTransform = new PromiseTransform(slowRandomPromise);
    
      const downloadPassthrough = new PassThrough();
    
      pipeline(
        sourceFile, //
        promiseTransform, //
        downloadPassthrough,
      ).catch(() => {
        console.log('pipeline failed, do the cleanup (delete s3 object etc..)');
      });
    
      reply.header('Content-Disposition', 'attachment; filename="qwe.js"');
      return downloadPassthrough;
    });
    
    app.listen({ port: 8080 });
    

    The _flush method in the PromiseTransform class is called when there are no more chunks to be transformed. It provides a way to perform any final processing before the stream is ended. In this case, the _flush method is used to wait for the provided promise (this.aPromise) to resolve before signaling that the stream has finished.

    Note that the stream is sent to the client (the stream is flowing so we are not wasting server resources) BUT the response is destroyed if the promise rejects (the virus scan fails).

    As result a node.js client will get the file under the hood:

    const http = require('http');
    
    function makeHttpRequest (url) {
      http.get(url, (response) => {
        const { statusCode } = response;
        console.log('statusCode:', statusCode);
    
        response.setEncoding('utf8');
        response.on('data', (data) => {
          // This is be called ether the virus scan fails or not
          console.log('data:', data);
        });
        response.on('error', (err) => {
          // This is be called if the virus scan fails
          console.log('Response error:', err);
        });
      });
    }
    
    makeHttpRequest('http://localhost:8080/');
    

    The browser will delete the downloaded the file if the virus scan fails:

    failed download

    This approach:

    • save server resources
    • save S3 bandwidth: you read once
    • waste client bandwidth: you send the file to the client, but the client will delete it if the virus scan fails. Here you may improve the PromiseTransform to not send the last data chunks to do not finalize the file on the client.

    I think this is a good trade-off because the case when the virus scan fails is rare IMHO.