Search code examples
node.jscassandrastreamnode-streams

Stream data from Cassandra to file considering backpressure


I have Node App that collects vote submissions and stores them in Cassandra. The votes are stored as base64 encoded encrypted strings. The API has an endpoint called /export that should get all of these votes strings (possibly > 1 million), convert them to binary and append them one after the other in a votes.egd file. That file should then be zipped and sent to the client. My idea is to stream the rows from Cassandra, converting each vote string to binary and writing to a WriteStream. I want to wrap this functionality in a Promise for easy use. I have the following:

streamVotesToFile(query, validVotesFileBasename) {
  return new Promise((resolve, reject) => {
    const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`);

    writeStream.on('error', (err) => {
      logger.error(`Writestream ${validVotesFileBasename}.egd error`);
      reject(err);
    });

    writeStream.on('drain', () => {
      logger.info(`Writestream ${validVotesFileBasename}.egd error`);
    })

    db.client.stream(query)
    .on('readable', function() {
      let row = this.read();
      while (row) {
        const envelope = new Buffer(row.vote, 'base64');
        if(!writeStream.write(envelope + '\n')) {
          logger.error(`Couldn't write vote`);
        }
        row = this.read()
      }
    })
    .on('end', () => { // No more rows from Cassandra
      writeStream.end();
      writeStream.on('finish', () => {
        logger.info(`Stream done writing`);
        resolve();
      });
    })
    .on('error', (err) => { // err is a response error from Cassandra
      reject(err);
    });
  });
}

When I run this it is appending all the votes to a file and downloading fine. But there are a bunch of problems/questions I have:

  1. If I make a req to the /export endpoint and this function runs, while it's running all other requests to the app are extremely slow or just don't finish before the export request is done. I'm guessing because the event loop being hogged by all of these events from the Cassandra stream (thousands per second) ?

  2. All the votes seem to write to the file fine yet I get false for almost every writeStream.write() call and see the corresponding logged message (see code) ?

  3. I understand that I need to consider backpressure and the 'drain' event for the WritableStream so ideally I would use pipe() and pipe the votes to a file because that has built in backpressure support (right?) but since I need to process each row (convert to binary and possible add other data from other row fields in the future), how would I do that with pipe?


Solution

  • This the perfect use case for a TransformStream:

    const myTransform = new Transform({
      readableObjectMode: true,
      transform(row, encoding, callback) {
        // Transform the row into something else
        const item = new Buffer(row['vote'], 'base64');
        callback(null, item);
      }
    });
    
    client.stream(query, params, { prepare: true })
      .pipe(myTransform)
      .pipe(fileStream);
    

    See more information on how to implement a TransformStream in the Node.js API Docs.